RocketMQ(消息重發、重復消費、事務、消息模式)


分布式開放消息系統(RocketMQ)的原理與實踐

 RocketMQ基礎:https://github.com/apache/rocketmq/tree/rocketmq-all-4.5.1/docs/cn

分布式消息系統作為實現分布式系統可擴展、可伸縮性的關鍵組件,需要具有高吞吐量、高可用等特點。而談到消息系統的設計,就回避不了兩個問題:

  1. 消息的順序問題
  2. 消息的重復問題

RocketMQ作為阿里開源的一款高性能、高吞吐量的消息中間件,它是怎樣來解決這兩個問題的?RocketMQ 有哪些關鍵特性?其實現原理是怎樣的?

關鍵特性以及其實現原理

一、順序消息

消息有序指的是一類消息消費時,能按照發送的順序來消費。例如:一個訂單產生了 3 條消息,分別是訂單創建、訂單付款、訂單完成。消費時,要按照這個順序消費才有意義。但同時訂單之間又是可以並行消費的。

假如生產者產生了2條消息:M1、M2,要保證這兩條消息的順序,應該怎樣做?你腦中想到的可能是這樣:


你可能會采用這種方式保證消息順序


M1發送到S1后,M2發送到S2,如果要保證M1先於M2被消費,那么需要M1到達消費端后,通知S2,然后S2再將M2發送到消費端。

這個模型存在的問題是,如果M1和M2分別發送到兩台Server上,就不能保證M1先達到,也就不能保證M1被先消費,那么就需要在MQ Server集群維護消息的順序。那么如何解決?一種簡單的方式就是將M1、M2發送到同一個Server上:


保證消息順序,你改進后的方法


這樣可以保證M1先於M2到達MQServer(客戶端等待M1成功后再發送M2),根據先達到先被消費的原則,M1會先於M2被消費,這樣就保證了消息的順序。

這個模型,理論上可以保證消息的順序,但在實際運用中你應該會遇到下面的問題:


網絡延遲問題

只要將消息從一台服務器發往另一台服務器,就會存在網絡延遲問題。如上圖所示,如果發送M1耗時大於發送M2的耗時,那么M2就先被消費,仍然不能保證消息的順序。即使M1和M2同時到達消費端,由於不清楚消費端1和消費端2的負載情況,仍然有可能出現M2先於M1被消費。如何解決這個問題?將M1和M2發往同一個消費者即可,且發送M1后,需要消費端響應成功后才能發送M2。

但又會引入另外一個問題,如果發送M1后,消費端1沒有響應,那是繼續發送M2呢,還是重新發送M1?一般為了保證消息一定被消費,肯定會選擇重發M1到另外一個消費端2,就如下圖所示。


保證消息順序的正確姿勢

這樣的模型就嚴格保證消息的順序,細心的你仍然會發現問題,消費端1沒有響應Server時有兩種情況,一種是M1確實沒有到達,另外一種情況是消費端1已經響應,但是Server端沒有收到。如果是第二種情況,重發M1,就會造成M1被重復消費。也就是我們后面要說的第二個問題,消息重復問題。

回過頭來看消息順序問題,嚴格的順序消息非常容易理解,而且處理問題也比較容易,要實現嚴格的順序消息,簡單且可行的辦法就是:

保證生產者 - MQServer - 消費者是一對一對一的關系

但是這樣設計,並行度就成為了消息系統的瓶頸(吞吐量不夠),也會導致更多的異常處理,比如:只要消費端出現問題,就會導致整個處理流程阻塞,我們不得不花費更多的精力來解決阻塞的問題。

但我們的最終目標是要集群的高容錯性和高吞吐量。這似乎是一對不可調和的矛盾,那么阿里是如何解決的?

世界上解決一個計算機問題最簡單的方法:“恰好”不需要解決它!

有些問題,看起來很重要,但實際上我們可以通過合理的設計或者將問題分解來規避。如果硬要把時間花在解決它們身上,實際上是浪費的,效率低下的。從這個角度來看消息的順序問題,我們可以得出兩個結論:

1、不關注亂序的應用實際大量存在
2、隊列無序並不意味着消息無序

最后我們從源碼角度分析RocketMQ怎么實現發送順序消息。

一般消息是通過輪詢所有隊列來發送的(負載均衡策略),順序消息可以根據業務,比如說訂單號相同的消息發送到同一個隊列。下面的示例中,OrderId相同的消息,會發送到同一個隊列:

復制代碼
// RocketMQ默認提供了兩種MessageQueueSelector實現:隨機/Hash
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        Integer id = (Integer) arg;
        int index = id % mqs.size();
        return mqs.get(index);
    }
}, orderId);
復制代碼

在獲取到路由信息以后,會根據MessageQueueSelector實現的算法來選擇一個隊列,同一個OrderId獲取到的隊列是同一個隊列。

復制代碼
private SendResult send()  {
    // 獲取topic路由信息
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    if (topicPublishInfo != null && topicPublishInfo.ok()) {
        MessageQueue mq = null;
        // 根據我們的算法,選擇一個發送隊列
        // 這里的arg = orderId
        mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg);
        if (mq != null) {
            return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, timeout);
        }
    }
}
復制代碼
二、消息重復

上面在解決消息順序問題時,引入了一個新的問題,就是消息重復。那么RocketMQ是怎樣解決消息重復的問題呢?還是“恰好”不解決。

造成消息的重復的根本原因是:網絡不可達。只要通過網絡交換數據,就無法避免這個問題。所以解決這個問題的辦法就是不解決,轉而繞過這個問題。那么問題就變成了:如果消費端收到兩條一樣的消息,應該怎樣處理?

1、消費端處理消息的業務邏輯保持冪等性
2、保證每條消息都有唯一編號且保證消息處理成功與去重表的日志同時出現

第1條很好理解,只要保持冪等性,不管來多少條重復消息,最后處理的結果都一樣。第2條原理就是利用一張日志表來記錄已經處理成功的消息的ID,如果新到的消息ID已經在日志表中,那么就不再處理這條消息。

我們可以看到第1條的解決方式,很明顯應該在消費端實現,不屬於消息系統要實現的功能。第2條可以消息系統實現,也可以業務端實現。正常情況下出現重復消息的概率不一定大,且由消息系統實現的話,肯定會對消息系統的吞吐量和高可用有影響,所以最好還是由業務端自己處理消息重復的問題,這也是RocketMQ不解決消息重復的問題的原因。

RocketMQ不保證消息不重復,如果你的業務需要保證嚴格的不重復消息,需要你自己在業務端去重。

三、事務消息

RocketMQ除了支持普通消息,順序消息,另外還支持事務消息。首先討論一下什么是事務消息以及支持事務消息的必要性。我們以一個轉帳的場景為例來說明這個問題:Bob向Smith轉賬100塊。

在單機環境下,執行事務的情況,大概是下面這個樣子:


單機環境下轉賬事務示意圖

當用戶增長到一定程度,Bob和Smith的賬戶及余額信息已經不在同一台服務器上了,那么上面的流程就變成了這樣:


集群環境下轉賬事務示意圖

這時候你會發現,同樣是一個轉賬的業務,在集群環境下,耗時居然成倍的增長,這顯然是不能夠接受的。那我們如何來規避這個問題?

大事務 = 小事務 + 異步

將大事務拆分成多個小事務異步執行。這樣基本上能夠將跨機事務的執行效率優化到與單機一致。轉賬的事務就可以分解成如下兩個小事務:


小事務+異步消息


圖中執行本地事務(Bob賬戶扣款)和發送異步消息應該保持同時成功或者失敗中,也就是扣款成功了,發送消息一定要成功,如果扣款失敗了,就不能再發送消息。那問題是:我們是先扣款還是先發送消息呢?

首先我們看下,先發送消息,大致的示意圖如下:


事務消息:先發送消息

存在的問題是:如果消息發送成功,但是扣款失敗,消費端就會消費此消息,進而向Smith賬戶加錢。

先發消息不行,那我們就先扣款唄,大致的示意圖如下:


事務消息-先扣款

存在的問題跟上面類似:如果扣款成功,發送消息失敗,就會出現Bob扣錢了,但是Smith賬戶未加錢。

可能大家會有很多的方法來解決這個問題,比如:直接將發消息放到Bob扣款的事務中去,如果發送失敗,拋出異常,事務回滾。這樣的處理方式也符合“恰好”不需要解決的原則。RocketMQ支持事務消息,下面我們來看看RocketMQ是怎樣來實現的。


RocketMQ實現發送事務消息

RocketMQ第一階段發送Prepared消息時,會拿到消息的地址,第二階段執行本地事物,第三階段通過第一階段拿到的地址去訪問消息,並修改狀態。細心的你可能又發現問題了,如果確認消息發送失敗了怎么辦?RocketMQ會定期掃描消息集群中的事物消息,這時候發現了Prepared消息,它會向消息發送者確認,Bob的錢到底是減了還是沒減呢?如果減了是回滾還是繼續發送確認消息呢?RocketMQ會根據發送端設置的策略來決定是回滾還是繼續發送確認消息。這樣就保證了消息發送與本地事務同時成功或同時失敗。

那我們來看下RocketMQ源碼,是不是這樣來處理事務消息的。客戶端發送事務消息的部分(完整代碼請查看:rocketmq-example工程下的com.alibaba.rocketmq.example.transaction.TransactionProducer):

按 Ctrl+C 復制代碼
按 Ctrl+C 復制代碼

接着查看sendMessageInTransaction方法的源碼,總共分為3個階段:發送Prepared消息、執行本地事務、發送確認消息。

復制代碼
public TransactionSendResult sendMessageInTransaction(.....)  {
    // 邏輯代碼,非實際代碼
    // 1.發送消息
    sendResult = this.send(msg);
    // sendResult.getSendStatus() == SEND_OK
    // 2.如果消息發送成功,處理與消息關聯的本地事務單元
    LocalTransactionState localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg);
    // 3.結束事務
    this.endTransaction(sendResult, localTransactionState, localException);
}
復制代碼

endTransaction方法會將請求發往broker(mq server)去更新事物消息的最終狀態:

  1. 根據sendResult找到Prepared消息
  2. 根據localTransaction更新消息的最終狀態

如果endTransaction方法執行失敗,導致數據沒有發送到brokerbroker會有回查線程定時(默認1分鍾)掃描每個存儲事務狀態的表格文件,如果是已經提交或者回滾的消息直接跳過,如果是prepared狀態則會向Producer發起CheckTransaction請求,Producer會調用DefaultMQProducerImpl.checkTransactionState()方法來處理broker的定時回調請求,而checkTransactionState會調用我們的事務設置的決斷方法,最后調用endTransactionOnewaybroker來更新消息的最終狀態。

再回到轉賬的例子,如果Bob的賬戶的余額已經減少,且消息已經發送成功,Smith端開始消費這條消息,這個時候就會出現消費失敗和消費超時兩個問題?解決超時問題的思路就是一直重試,直到消費端消費消息成功,整個過程中有可能會出現消息重復的問題,按照前面的思路解決即可。


消費事務消息

這樣基本上可以解決超時問題,但是如果消費失敗怎么辦?阿里提供給我們的解決方法是:人工解決。大家可以考慮一下,按照事務的流程,因為某種原因Smith加款失敗,需要回滾整個流程。如果消息系統要實現這個回滾流程的話,系統復雜度將大大提升,且很容易出現Bug,估計出現Bug的概率會比消費失敗的概率大很多。我們需要衡量是否值得花這么大的代價來解決這樣一個出現概率非常小的問題,這也是大家在解決疑難問題時需要多多思考的地方。

20160321補充:在3.2.6版本中移除了事務消息的實現,所以此版本不支持事務消息,具體情況請參考rocketmq的issues:
https://github.com/alibaba/RocketMQ/issues/65
https://github.com/alibaba/RocketMQ/issues/138
https://github.com/alibaba/RocketMQ/issues/156

四、Producer如何發送消息

Producer輪詢某topic下的所有隊列的方式來實現發送方的負載均衡,如下圖所示:


producer發送消息負載均衡


首先分析一下RocketMQ的客戶端發送消息的源碼:

復制代碼
// 構造Producer
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
// 初始化Producer,整個應用生命周期內,只需要初始化1次
producer.start();
// 構造Message
Message msg = new Message("TopicTest1",// topic
                        "TagA",// tag:給消息打標簽,用於區分一類消息,可為null
                        "OrderID188",// key:自定義Key,可以用於去重,可為null
                        ("Hello MetaQ").getBytes());// body:消息內容
// 發送消息並返回結果
SendResult sendResult = producer.send(msg);
// 清理資源,關閉網絡連接,注銷自己
producer.shutdown();
復制代碼

在整個應用生命周期內,生產者需要調用一次start方法來初始化,初始化主要完成的任務有:

  1. 如果沒有指定namesrv地址,將會自動尋址
  2. 啟動定時任務:更新namesrv地址、從namsrv更新topic路由信息、清理已經掛掉的broker、向所有broker發送心跳...
  3. 啟動負載均衡的服務

初始化完成后,開始發送消息,發送消息的主要代碼如下:

復制代碼
private SendResult sendDefaultImpl(Message msg,......) {
    // 檢查Producer的狀態是否是RUNNING
    this.makeSureStateOK();
    // 檢查msg是否合法:是否為null、topic,body是否為空、body是否超長
    Validators.checkMessage(msg, this.defaultMQProducer);
    // 獲取topic路由信息
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    // 從路由信息中選擇一個消息隊列
    MessageQueue mq = topicPublishInfo.selectOneMessageQueue(lastBrokerName);
    // 將消息發送到該隊列上去
    sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, timeout);
}
復制代碼

代碼中需要關注的兩個方法tryToFindTopicPublishInfoselectOneMessageQueue。前面說過在producer初始化時,會啟動定時任務獲取路由信息並更新到本地緩存,所以tryToFindTopicPublishInfo會首先從緩存中獲取topic路由信息,如果沒有獲取到,則會自己去namesrv獲取路由信息。selectOneMessageQueue方法通過輪詢的方式,返回一個隊列,以達到負載均衡的目的。

如果Producer發送消息失敗,會自動重試,重試的策略:

  1. 重試次數 < retryTimesWhenSendFailed(可配置)
  2. 總的耗時(包含重試n次的耗時) < sendMsgTimeout(發送消息時傳入的參數)
  3. 同時滿足上面兩個條件后,Producer會選擇另外一個隊列發送消息
五、消息存儲

RocketMQ的消息存儲是由consume queuecommit log配合完成的。

1、Consume Queue
consume queue是消息的邏輯隊列,相當於字典的目錄,用來指定消息在物理文件commit log上的位置。

我們可以在配置中指定consume queue與commitlog存儲的目錄
每個topic下的每個queue都有一個對應的consumequeue文件,比如:

${rocketmq.home}/store/consumequeue/${topicName}/${queueId}/${fileName}

Consume Queue文件組織,如圖所示:


Consume Queue文件組織示意圖
  1. 根據topicqueueId來組織文件,圖中TopicA有兩個隊列0,1,那么TopicA和QueueId=0組成一個ConsumeQueue,TopicA和QueueId=1組成另一個ConsumeQueue。
  2. 按照消費端的GroupName來分組重試隊列,如果消費端消費失敗,消息將被發往重試隊列中,比如圖中的%RETRY%ConsumerGroupA
  3. 按照消費端的GroupName來分組死信隊列,如果消費端消費失敗,並重試指定次數后,仍然失敗,則發往死信隊列,比如圖中的%DLQ%ConsumerGroupA

死信隊列(Dead Letter Queue)一般用於存放由於某種原因無法傳遞的消息,比如處理失敗或者已經過期的消息。

Consume Queue中存儲單元是一個20字節定長的二進制數據,順序寫順序讀,如下圖所示:


consumequeue文件存儲單元格式
  1. CommitLog Offset是指這條消息在Commit Log文件中的實際偏移量
  2. Size存儲中消息的大小
  3. Message Tag HashCode存儲消息的Tag的哈希值:主要用於訂閱時消息過濾(訂閱時如果指定了Tag,會根據HashCode來快速查找到訂閱的消息)
2、Commit Log

CommitLog:消息存放的物理文件,每台broker上的commitlog被本機所有的queue共享,不做任何區分。
文件的默認位置如下,仍然可通過配置文件修改:

${user.home} \store\${commitlog}\${fileName}

CommitLog的消息存儲單元長度不固定,文件順序寫,隨機讀。消息的存儲結構如下表所示,按照編號順序以及編號對應的內容依次存儲。


Commit Log存儲單元結構圖
3、消息存儲實現

消息存儲實現,比較復雜,也值得大家深入了解,后面會單獨成文來分析,這小節只以代碼說明一下具體的流程。

復制代碼
// Set the storage time
msg.setStoreTimestamp(System.currentTimeMillis());
// Set the message body BODY CRC (consider the most appropriate setting
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
synchronized (this) {
    long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
    // Here settings are stored timestamp, in order to ensure an orderly global
    msg.setStoreTimestamp(beginLockTimestamp);
    // MapedFile:操作物理文件在內存中的映射以及將內存數據持久化到物理文件中
    MapedFile mapedFile = this.mapedFileQueue.getLastMapedFile();
    // 將Message追加到文件commitlog
    result = mapedFile.appendMessage(msg, this.appendMessageCallback);
    switch (result.getStatus()) {
    case PUT_OK:break;
    case END_OF_FILE:
         // Create a new file, re-write the message
         mapedFile = this.mapedFileQueue.getLastMapedFile();
         result = mapedFile.appendMessage(msg, this.appendMessageCallback);
     break;
     DispatchRequest dispatchRequest = new DispatchRequest(
                topic,// 1
                queueId,// 2
                result.getWroteOffset(),// 3
                result.getWroteBytes(),// 4
                tagsCode,// 5
                msg.getStoreTimestamp(),// 6
                result.getLogicsOffset(),// 7
                msg.getKeys(),// 8
                /**
                 * Transaction
                 */
                msg.getSysFlag(),// 9
                msg.getPreparedTransactionOffset());// 10
    // 1.分發消息位置到ConsumeQueue
    // 2.分發到IndexService建立索引
    this.defaultMessageStore.putDispatchRequest(dispatchRequest);
}
復制代碼
4、消息的索引文件

如果一個消息包含key值的話,會使用IndexFile存儲消息索引,文件的內容結構如圖:


消息索引


索引文件主要用於根據key來查詢消息的,流程主要是:

  1. 根據查詢的 key 的 hashcode%slotNum 得到具體的槽的位置(slotNum 是一個索引文件里面包含的最大槽的數目,例如圖中所示 slotNum=5000000)
  2. 根據 slotValue(slot 位置對應的值)查找到索引項列表的最后一項(倒序排列,slotValue 總是指向最新的一個索引項)
  3. 遍歷索引項列表返回查詢時間范圍內的結果集(默認一次最大返回的 32 條記錄)
六、消息訂閱

RocketMQ消息訂閱有兩種模式,一種是Push模式,即MQServer主動向消費端推送;另外一種是Pull模式,即消費端在需要時,主動到MQServer拉取。但在具體實現時,Push和Pull模式都是采用消費端主動拉取的方式。

首先看下消費端的負載均衡:


消費端負載均衡


消費端會通過RebalanceService線程,10秒鍾做一次基於topic下的所有隊列負載:

  1. 遍歷Consumer下的所有topic,然后根據topic訂閱所有的消息
  2. 獲取同一topic和Consumer Group下的所有Consumer
  3. 然后根據具體的分配策略來分配消費隊列,分配的策略包含:平均分配、消費端配置等

如同上圖所示:如果有 5 個隊列,2 個 consumer,那么第一個 Consumer 消費 3 個隊列,第二 consumer 消費 2 個隊列。這里采用的就是平均分配策略,它類似於我們的分頁,TOPIC下面的所有queue就是記錄,Consumer的個數就相當於總的頁數,那么每頁有多少條記錄,就類似於某個Consumer會消費哪些隊列。

通過這樣的策略來達到大體上的平均消費,這樣的設計也可以很方面的水平擴展Consumer來提高消費能力。

消費端的Push模式是通過長輪詢的模式來實現的,就如同下圖:


Push模式示意圖


Consumer端每隔一段時間主動向broker發送拉消息請求,broker在收到Pull請求后,如果有消息就立即返回數據,Consumer端收到返回的消息后,再回調消費者設置的Listener方法。如果broker在收到Pull請求時,消息隊列里沒有數據,broker端會阻塞請求直到有數據傳遞或超時才返回。

當然,Consumer端是通過一個線程將阻塞隊列LinkedBlockingQueue<PullRequest>中的PullRequest發送到broker拉取消息,以防止Consumer一致被阻塞。而Broker端,在接收到Consumer的PullRequest時,如果發現沒有消息,就會把PullRequest扔到ConcurrentHashMap中緩存起來。broker在啟動時,會啟動一個線程不停的從ConcurrentHashMap取出PullRequest檢查,直到有數據返回。

七、RocketMQ的其他特性

前面的6個特性都是基本上都是點到為止,想要深入了解,還需要大家多多查看源碼,多多在實際中運用。當然除了已經提到的特性外,RocketMQ還支持:

  1. 定時消息
  2. 消息的刷盤策略
  3. 主動同步策略:同步雙寫、異步復制
  4. 海量消息堆積能力
  5. 高效通信
  6. .......

其中涉及到的很多設計思路和解決方法都值得我們深入研究:

  1. 消息的存儲設計:既要滿足海量消息的堆積能力,又要滿足極快的查詢效率,還要保證寫入的效率。
  2. 高效的通信組件設計:高吞吐量,毫秒級的消息投遞能力都離不開高效的通信。
  3. .......

RocketMQ最佳實踐

一、Producer最佳實踐

1、一個應用盡可能用一個 Topic,消息子類型用 tags 來標識,tags 可以由應用自由設置。只有發送消息設置了tags,消費方在訂閱消息時,才可以利用 tags 在 broker 做消息過濾。
2、每個消息在業務層面的唯一標識碼,要設置到 keys 字段,方便將來定位消息丟失問題。由於是哈希索引,請務必保證 key 盡可能唯一,這樣可以避免潛在的哈希沖突。
3、消息發送成功或者失敗,要打印消息日志,務必要打印 sendresult 和 key 字段。
4、對於消息不可丟失應用,務必要有消息重發機制。例如:消息發送失敗,存儲到數據庫,能有定時程序嘗試重發或者人工觸發重發。
5、某些應用如果不關注消息是否發送成功,請直接使用sendOneWay方法發送消息。

二、Consumer最佳實踐

1、消費過程要做到冪等(即消費端去重)
2、盡量使用批量方式消費方式,可以很大程度上提高消費吞吐量。
3、優化每條消息消費過程

三、其他配置

線上應該關閉autoCreateTopicEnable,即在配置文件中將其設置為false

RocketMQ在發送消息時,會首先獲取路由信息。如果是新的消息,由於MQServer上面還沒有創建對應的Topic,這個時候,如果上面的配置打開的話,會返回默認TOPIC的(RocketMQ會在每台broker上面創建名為TBW102的TOPIC)路由信息,然后Producer會選擇一台Broker發送消息,選中的broker在存儲消息時,發現消息的topic還沒有創建,就會自動創建topic。后果就是:以后所有該TOPIC的消息,都將發送到這台broker上,達不到負載均衡的目的。

所以基於目前RocketMQ的設計,建議關閉自動創建TOPIC的功能,然后根據消息量的大小,手動創建TOPIC。

RocketMQ設計相關

RocketMQ的設計假定:

每台PC機器都可能宕機不可服務
任意集群都有可能處理能力不足
最壞的情況一定會發生
內網環境需要低延遲來提供最佳用戶體驗

RocketMQ的關鍵設計:

分布式集群化
強數據安全
海量數據堆積
毫秒級投遞延遲(推拉模式)

這是RocketMQ在設計時的假定前提以及需要到達的效果。我想這些假定適用於所有的系統設計。隨着我們系統的服務的增多,每位開發者都要注意自己的程序是否存在單點故障,如果掛了應該怎么恢復、能不能很好的水平擴展、對外的接口是否足夠高效、自己管理的數據是否足夠安全...... 多多規范自己的設計,才能開發出高效健壯的程序。

附錄:RocketMQ涉及到的幾個專業術語和整體架構介紹

一、RocketMQ中的專業術語

Topic
topic表示消息的第一級類型,比如一個電商系統的消息可以分為:交易消息、物流消息...... 一條消息必須有一個Topic

Tag
Tag表示消息的第二級類型,比如交易消息又可以分為:交易創建消息,交易完成消息..... 一條消息可以沒有Tag。RocketMQ提供2級消息分類,方便大家靈活控制。

Queue
一個topic下,我們可以設置多個queue(消息隊列)。當我們發送消息時,需要要指定該消息的topic。RocketMQ會輪詢該topic下的所有隊列,將消息發送出去。

Producer 與 Producer Group
Producer表示消息隊列的生產者。消息隊列的本質就是實現了publish-subscribe模式,生產者生產消息,消費者消費消息。所以這里的Producer就是用來生產和發送消息的,一般指業務系統。

Producer Group是一類Producer的集合名稱,這類Producer通常發送一類消息,且發送邏輯一致。

Consumer 與 Consumer Group
消息消費者,一般由后台系統異步消費消息。

Push Consumer
Consumer 的一種,應用通常向 Consumer 對象注冊一個 Listener 接口,一旦收到消息,Consumer 對象立刻回調 Listener 接口方法。
Pull Consumer
Consumer 的一種,應用通常主動調用 Consumer 的拉消息方法從 Broker 拉消息,主動權由應用控制。

Consumer Group是一類Consumer的集合名稱,這類Consumer通常消費一類消息,且消費邏輯一致。

Broker
消息的中轉者,負責存儲和轉發消息。可以理解為消息隊列服務器,提供了消息的接收、存儲、拉取和轉發服務。broker是RocketMQ的核心,它不不能掛的,所以需要保證broker的高可用。

廣播消費
一條消息被多個Consumer消費,即使這些Consumer屬於同一個Consumer Group,消息也會被Consumer Group中的每個Consumer都消費一次。在廣播消費中的Consumer Group概念可以認為在消息划分方面無意義。

集群消費
一個Consumer Group中的Consumer實例平均分攤消費消息。例如某個Topic有 9 條消息,其中一個Consumer Group有 3 個實例(可能是 3 個進程,或者 3 台機器),那么每個實例只消費其中的 3 條消息。

NameServer
NameServer即名稱服務,兩個功能:

  1. 接收broker的請求,注冊broker的路由信息
  2. 接口client的請求,根據某個topic獲取其到broker的路由信息
    NameServer沒有狀態,可以橫向擴展。每個broker在啟動的時候會到NameServer注冊;Producer在發送消息前會根據topicNameServer獲取路由(到broker)信息;Consumer也會定時獲取topic路由信息。
二、RocketMQ Overview

rocketmq overview


Producer向一些隊列輪流發送消息,隊列集合稱為TopicConsumer如果做廣播消費,則一個consumer實例消費這個Topic對應的所有隊列;如果做集群消費,則多個Consumer實例平均消費這個Topic對應的隊列集合。

再看下RocketMQ物理部署結構圖:


RocketMQ網絡部署圖


RocketMQ網絡部署特點:

    1. Name Server 是一個幾乎無狀態節點,可集群部署,節點之間無任何信息同步。
    2. Broker部署相對復雜,Broker分為MasterSlave,一個Master可以對應多個Slave,但是一個Slave只能對應一個MasterMasterSlave的對應關系通過指定相同的BrokerName,不同的BrokerId來定義,BrokerId=0表示Master,非0表示SlaveMaster也可以部署多個。每個BrokerName Server集群中的所有節點建立長連接,定時注冊Topic信息到所有Name Server
    3. ProducerName Server集群中的其中一個節點(隨機選擇)建立長連接,定期從Name Server取Topic路由信息,並向提供Topic 服務的Master建立長連接,且定時向Master發送心跳。Producer 完全無狀態,可集群部署。
    4. ConsumerName Server集群中的其中一個節點(隨機選擇)建立長連接,定期從Name Server取Topic 路由信息,並向提供Topic服務的Master、Slave建立長連接,且定時向Master、Slave發送心跳。Consumer既可以從Master訂閱消息,也可以從Slave訂閱消息,訂閱規則由Broker配置決定。

 

 

 

 

RocketMQ事務消費和順序消費詳解

一、RocketMq有3中消息類型

1.普通消費

2. 順序消費

3.事務消費

  • 順序消費場景

在網購的時候,我們需要下單,那么下單需要假如有三個順序,第一、創建訂單 ,第二:訂單付款,第三:訂單完成。也就是這個三個環節要有順序,這個訂單才有意義。RocketMQ可以保證順序消費。

  • rocketMq實現順序消費的原理

 produce在發送消息的時候,把消息發到同一個隊列(queue)中,消費者注冊消息監聽器為MessageListenerOrderly,這樣就可以保證消費端只有一個線程去消費消息

注意:是把把消息發到同一個隊列(queue),不是同一個topic,默認情況下一個topic包括4個queue

單個節點(Producer端1個、Consumer端1個)

1、Producer.java 

復制代碼
package order;  
  
import java.util.List;  
  
import com.alibaba.rocketmq.client.exception.MQBrokerException;  
import com.alibaba.rocketmq.client.exception.MQClientException;  
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;  
import com.alibaba.rocketmq.client.producer.MessageQueueSelector;  
import com.alibaba.rocketmq.client.producer.SendResult;  
import com.alibaba.rocketmq.common.message.Message;  
import com.alibaba.rocketmq.common.message.MessageQueue;  
import com.alibaba.rocketmq.remoting.exception.RemotingException;  
  
/** 
 * Producer,發送順序消息 
 */  
public class Producer {  
    public static void main(String[] args) {  
        try {  
            DefaultMQProducer producer = new DefaultMQProducer("order_Producer");  
            producer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");  
  
            producer.start();  
  
            // String[] tags = new String[] { "TagA", "TagB", "TagC", "TagD",  
            // "TagE" };  
  
            for (int i = 1; i <= 5; i++) {  
  
                Message msg = new Message("TopicOrderTest", "order_1", "KEY" + i, ("order_1 " + i).getBytes());  
  
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {  
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {  
                        Integer id = (Integer) arg;  
                        int index = id % mqs.size();  
                        return mqs.get(index);  
                    }  
                }, 0);  
  
                System.out.println(sendResult);  
            }  
  
            producer.shutdown();  
        } catch (MQClientException e) {  
            e.printStackTrace();  
        } catch (RemotingException e) {  
            e.printStackTrace();  
        } catch (MQBrokerException e) {  
            e.printStackTrace();  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        }  
    }  
}  
復制代碼

2、Consumer.java

復制代碼
package order;   
import java.util.List;  
import java.util.concurrent.TimeUnit;  
import java.util.concurrent.atomic.AtomicLong;   
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;  
import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext;  
import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;  
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;  
import com.alibaba.rocketmq.client.exception.MQClientException;  
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;  
import com.alibaba.rocketmq.common.message.MessageExt;  
  
/** 
 * 順序消息消費,帶事務方式(應用可控制Offset什么時候提交) 
 */  
public class Consumer1 {  
  
    public static void main(String[] args) throws MQClientException {  
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_Consumer");  
        consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");  
  
        /** 
         * 設置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費<br> 
         * 如果非第一次啟動,那么按照上次消費的位置繼續消費 
         */  
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);  
  
        consumer.subscribe("TopicOrderTest", "*");  
  
        consumer.registerMessageListener(new MessageListenerOrderly() {  
            AtomicLong consumeTimes = new AtomicLong(0);  
  
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {  
                // 設置自動提交  
                context.setAutoCommit(true);  
                for (MessageExt msg : msgs) {  
                    System.out.println(msg + ",內容:" + new String(msg.getBody()));  
                }  
  
                try {  
                    TimeUnit.SECONDS.sleep(5L);  
                } catch (InterruptedException e) {  
  
                    e.printStackTrace();  
                }  
                ;  
  
                return ConsumeOrderlyStatus.SUCCESS;  
            }  
        });  
  
        consumer.start();  
  
        System.out.println("Consumer1 Started.");  
    }  
  
}  
復制代碼

結果如下圖所示:

這個五條數據被順序消費了

  • 多個節點(Producer端1個、Consumer端2個)

Producer.java

復制代碼
package order;  
  
import java.util.List;  
  
import com.alibaba.rocketmq.client.exception.MQBrokerException;  
import com.alibaba.rocketmq.client.exception.MQClientException;  
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;  
import com.alibaba.rocketmq.client.producer.MessageQueueSelector;  
import com.alibaba.rocketmq.client.producer.SendResult;  
import com.alibaba.rocketmq.common.message.Message;  
import com.alibaba.rocketmq.common.message.MessageQueue;  
import com.alibaba.rocketmq.remoting.exception.RemotingException;  
  
/** 
 * Producer,發送順序消息 
 */  
public class Producer {  
    public static void main(String[] args) {  
        try {  
            DefaultMQProducer producer = new DefaultMQProducer("order_Producer");  
            producer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");  
  
            producer.start();  
  
            // String[] tags = new String[] { "TagA", "TagB", "TagC", "TagD",  
            // "TagE" };  
  
            for (int i = 1; i <= 5; i++) {  
  
                Message msg = new Message("TopicOrderTest", "order_1", "KEY" + i, ("order_1 " + i).getBytes());  
  
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {  
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {  
                        Integer id = (Integer) arg;  
                        int index = id % mqs.size();  
                        return mqs.get(index);  
                    }  
                }, 0);  
  
                System.out.println(sendResult);  
            }  
            for (int i = 1; i <= 5; i++) {  
  
                Message msg = new Message("TopicOrderTest", "order_2", "KEY" + i, ("order_2 " + i).getBytes());  
  
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {  
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {  
                        Integer id = (Integer) arg;  
                        int index = id % mqs.size();  
                        return mqs.get(index);  
                    }  
                }, 1);  
  
                System.out.println(sendResult);  
            }  
            for (int i = 1; i <= 5; i++) {  
  
                Message msg = new Message("TopicOrderTest", "order_3", "KEY" + i, ("order_3 " + i).getBytes());  
  
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {  
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {  
                        Integer id = (Integer) arg;  
                        int index = id % mqs.size();  
                        return mqs.get(index);  
                    }  
                }, 2);  
  
                System.out.println(sendResult);  
            }  
  
            producer.shutdown();  
        } catch (MQClientException e) {  
            e.printStackTrace();  
        } catch (RemotingException e) {  
            e.printStackTrace();  
        } catch (MQBrokerException e) {  
            e.printStackTrace();  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        }  
    }  
}  
復制代碼

Consumer1.java

 
復制代碼
/** 
 * 順序消息消費,帶事務方式(應用可控制Offset什么時候提交) 
 */  
public class Consumer1 {  
  
    public static void main(String[] args) throws MQClientException {  
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_Consumer");  
        consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");  
  
        /** 
         * 設置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費<br> 
         * 如果非第一次啟動,那么按照上次消費的位置繼續消費 
         */  
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);  
  
        consumer.subscribe("TopicOrderTest", "*");  
          
        /** 
         * 實現了MessageListenerOrderly表示一個隊列只會被一個線程取到  
         *,第二個線程無法訪問這個隊列 
         */  
        consumer.registerMessageListener(new MessageListenerOrderly() {  
            AtomicLong consumeTimes = new AtomicLong(0);  
  
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {  
                // 設置自動提交  
                context.setAutoCommit(true);  
                for (MessageExt msg : msgs) {  
                    System.out.println(msg + ",內容:" + new String(msg.getBody()));  
                }  
  
                try {  
                    TimeUnit.SECONDS.sleep(5L);  
                } catch (InterruptedException e) {  
  
                    e.printStackTrace();  
                }  
                ;  
  
                return ConsumeOrderlyStatus.SUCCESS;  
            }  
        });  
  
        consumer.start();  
  
        System.out.println("Consumer1 Started.");  
    }  
  
}  
復制代碼

Consumer2.java

復制代碼
/** 
 * 順序消息消費,帶事務方式(應用可控制Offset什么時候提交) 
 */  
public class Consumer2 {  
  
    public static void main(String[] args) throws MQClientException {  
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_Consumer");  
        consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");  
  
        /** 
         * 設置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費<br> 
         * 如果非第一次啟動,那么按照上次消費的位置繼續消費 
         */  
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);  
  
        consumer.subscribe("TopicOrderTest", "*");  
          
        /** 
         * 實現了MessageListenerOrderly表示一個隊列只會被一個線程取到  
         *,第二個線程無法訪問這個隊列 
         */  
        consumer.registerMessageListener(new MessageListenerOrderly() {  
            AtomicLong consumeTimes = new AtomicLong(0);  
  
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {  
                // 設置自動提交  
                context.setAutoCommit(true);  
                for (MessageExt msg : msgs) {  
                    System.out.println(msg + ",內容:" + new String(msg.getBody()));  
                }  
  
                try {  
                    TimeUnit.SECONDS.sleep(5L);  
                } catch (InterruptedException e) {  
  
                    e.printStackTrace();  
                }  
                ;  
  
                return ConsumeOrderlyStatus.SUCCESS;  
            }  
        });  
  
        consumer.start();  
  
        System.out.println("Consumer2 Started.");  
    }  
  
}  
復制代碼

先啟動Consumer1和Consumer2,然后啟動Producer,Producer會發送15條消息
Consumer1消費情況如圖,都按照順序執行了




Consumer2消費情況如圖,都按照順序執行了

二、事務消費

這里說的主要是分布式事物。下面的例子的數據庫分別安裝在不同的節點上。

事物消費需要先說說什么是事務。比如說:我們跨行轉賬,從工商銀行轉到建設銀行,也就是我從工商銀行扣除1000元之后,我的建設銀行也必須加1000元。這樣才能保證數據的一致性。假如工商銀行轉1000元之后,建設銀行的服務器突然宕機,那么我扣除了1000,但是並沒有在建設銀行給我加1000,就出現了數據的不一致。因此加1000和減1000才行,減1000和減1000必須一起成功,一起失敗。

再比如,我們進行網購的時候,我們下單之后,訂單提交成功,倉庫商品的數量必須減一。但是訂單可能是一個數據庫,倉庫數量可能又是在另個數據庫里面。有可能訂單提交成功之后,倉庫數量服務器突然宕機。這樣也出現了數據不一致的問題。

使用消息隊列來解決分布式事物:

現在我們去外面飯店吃飯,很多時候都不會直接給了錢之后直接在付款的窗口遞飯菜,而是付款之后他會給你一張小票,你拿着這個小票去出飯的窗口取飯。這里和我們的系統類似,提高了吞吐量。即使你到第二個窗口,師傅告訴你已經沒飯了,你可以拿着這個憑證去退款,即使中途由於出了意外你無法到達窗口進行取飯,但是只要憑證還在,可以將錢退給你。這樣就保證了數據的一致性。

如何保證憑證(消息)有2種方法:

1、在工商銀行扣款的時候,余額表扣除1000,同時記錄日志,而且這2個表是在同一個數據庫實例中,可以使用本地事物解決。然后我們通知建設銀行需要加1000給該用戶,建設銀行收到之后給我返回已經加了1000給用戶的確認信息之后,我再標記日志表里面的日志為已經完成。

2、通過消息中間件

原文地址:http://www.jianshu.com/p/453c6e7ff81c

 

RocketMQ第一階段發送Prepared消息時,會拿到消息的地址,第二階段執行本地事物,第三階段通過第一階段拿到的地址去訪問消息,並修改消息的狀態。

細心的你可能又發現問題了,如果確認消息發送失敗了怎么辦?RocketMQ會定期掃描消息集群中的事物消息,如果發現了Prepared消息,它會向消息發送端(生產者)確認,Bob的錢到底是減了還是沒減呢?如果減了是回滾還是繼續發送確認消息呢?RocketMQ會根據發送端設置的策略來決定是回滾還是繼續發送確認消息。這樣就保證了消息發送與本地事務同時成功或同時失敗。

例子:

Consumer.java

 

 

復制代碼
package transaction;  
  
import java.util.List;  
  
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;  
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;  
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;  
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;  
import com.alibaba.rocketmq.client.exception.MQClientException;  
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;  
import com.alibaba.rocketmq.common.message.MessageExt;  
  
/** 
 * Consumer,訂閱消息 
 */  
public class Consumer {  
  
    public static void main(String[] args) throws InterruptedException, MQClientException {  
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("transaction_Consumer");  
        consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");  
        consumer.setConsumeMessageBatchMaxSize(10);  
        /** 
         * 設置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費<br> 
         * 如果非第一次啟動,那么按照上次消費的位置繼續消費 
         */  
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);  
  
        consumer.subscribe("TopicTransactionTest", "*");  
  
        consumer.registerMessageListener(new MessageListenerConcurrently() {  
  
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {  
  
                try {  
  
                    for (MessageExt msg : msgs) {  
                        System.out.println(msg + ",內容:" + new String(msg.getBody()));  
                    }  
  
                } catch (Exception e) {  
                    e.printStackTrace();  
  
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重試  
  
                }  
  
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功  
            }  
        });  
  
        consumer.start();  
  
        System.out.println("transaction_Consumer Started.");  
    }  
}  
復制代碼

Producer.java

 

  

復制代碼
package transaction;  
  
import com.alibaba.rocketmq.client.exception.MQClientException;  
import com.alibaba.rocketmq.client.producer.SendResult;  
import com.alibaba.rocketmq.client.producer.TransactionCheckListener;  
import com.alibaba.rocketmq.client.producer.TransactionMQProducer;  
import com.alibaba.rocketmq.common.message.Message;  
  
/** 
 * 發送事務消息例子 
 *  
 */  
public class Producer {  
    public static void main(String[] args) throws MQClientException, InterruptedException {  
  
        TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();  
        TransactionMQProducer producer = new TransactionMQProducer("transaction_Producer");  
        producer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");  
        // 事務回查最小並發數  
        producer.setCheckThreadPoolMinSize(2);  
        // 事務回查最大並發數  
        producer.setCheckThreadPoolMaxSize(2);  
        // 隊列數  
        producer.setCheckRequestHoldMax(2000);  
        producer.setTransactionCheckListener(transactionCheckListener);  
        producer.start();  
  
        // String[] tags = new String[] { "TagA", "TagB", "TagC", "TagD", "TagE"  
        // };  
        TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();  
        for (int i = 1; i <= 2; i++) {  
            try {  
                Message msg = new Message("TopicTransactionTest", "transaction" + i, "KEY" + i,  
                        ("Hello RocketMQ " + i).getBytes());  
                SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null);  
                System.out.println(sendResult);  
  
                Thread.sleep(10);  
            } catch (MQClientException e) {  
                e.printStackTrace();  
            }  
        }  
  
        for (int i = 0; i < 100000; i++) {  
            Thread.sleep(1000);  
        }  
  
        producer.shutdown();  
  
    }  
}  
復制代碼

TransactionExecuterImpl .java --執行本地事務

復制代碼
package transaction;  
  
import com.alibaba.rocketmq.client.producer.LocalTransactionExecuter;  
import com.alibaba.rocketmq.client.producer.LocalTransactionState;  
import com.alibaba.rocketmq.common.message.Message;  
  
/** 
 * 執行本地事務 
 */  
public class TransactionExecuterImpl implements LocalTransactionExecuter {  
    // private AtomicInteger transactionIndex = new AtomicInteger(1);  
  
    public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg) {  
  
        System.out.println("執行本地事務msg = " + new String(msg.getBody()));  
  
        System.out.println("執行本地事務arg = " + arg);  
  
        String tags = msg.getTags();  
  
        if (tags.equals("transaction2")) {  
            System.out.println("======我的操作============,失敗了  -進行ROLLBACK");  
            return LocalTransactionState.ROLLBACK_MESSAGE;  
        }  
        return LocalTransactionState.COMMIT_MESSAGE;  
        // return LocalTransactionState.UNKNOW;  
    }  
}  
復制代碼


TransactionCheckListenerImpl--未決事務,服務器回查客戶端(目前已經被閹割啦)

 
復制代碼
package transaction;  
  
import com.alibaba.rocketmq.client.producer.LocalTransactionState;  
import com.alibaba.rocketmq.client.producer.TransactionCheckListener;  
import com.alibaba.rocketmq.common.message.MessageExt;  
  
/** 
 * 未決事務,服務器回查客戶端 
 */  
public class TransactionCheckListenerImpl implements TransactionCheckListener {  
    // private AtomicInteger transactionIndex = new AtomicInteger(0);  
  
    //在這里,我們可以根據由MQ回傳的key去數據庫查詢,這條數據到底是成功了還是失敗了。  
    public LocalTransactionState checkLocalTransactionState(MessageExt msg) {  
        System.out.println("未決事務,服務器回查客戶端msg =" + new String(msg.getBody().toString()));  
        // return LocalTransactionState.ROLLBACK_MESSAGE;  
  
        return LocalTransactionState.COMMIT_MESSAGE;  
  
        // return LocalTransactionState.UNKNOW;  
    }  
}  
復制代碼

producer端:發送數據到MQ,並且處理本地事物。這里模擬了一個成功一個失敗。Consumer只會接收到本地事物成功的數據。第二個數據失敗了,不會被消費。



Consumer只會接收到一個,第二個數據不會被接收到

 

 

 

 

分布式消息隊列RocketMQ&Kafka -- 消息的“順序消費”

在說到消息中間件的時候,我們通常都會談到一個特性:消息的順序消費問題。這個問題看起來很簡單:Producer發送消息1, 2, 3。。。 Consumer按1, 2, 3。。。順序消費。

但實際情況卻是:無論RocketMQ,還是Kafka,缺省都不保證消息的嚴格有序消費!

這個特性看起來很簡單,但為什么缺省他們都不保證呢?

 

“嚴格的順序消費”有多么困難

下面就從3個方面來分析一下,對於一個消息中間件來說,”嚴格的順序消費”有多么困難,或者說不可能。

發送端

發送端不能異步發送,異步發送在發送失敗的情況下,就沒辦法保證消息順序。

比如你連續發了1,2,3。 過了一會,返回結果1失敗,2, 3成功。你把1再重新發送1遍,這個時候順序就亂掉了。

存儲端

對於存儲端,要保證消息順序,會有以下幾個問題: 
(1)消息不能分區。也就是1個topic,只能有1個隊列。在Kafka中,它叫做partition;在RocketMQ中,它叫做queue。 如果你有多個隊列,那同1個topic的消息,會分散到多個分區里面,自然不能保證順序。

(2)即使只有1個隊列的情況下,會有第2個問題。該機器掛了之后,能否切換到其他機器?也就是高可用問題。

比如你當前的機器掛了,上面還有消息沒有消費完。此時切換到其他機器,可用性保證了。但消息順序就亂掉了。

要想保證,一方面要同步復制,不能異步復制;另1方面得保證,切機器之前,掛掉的機器上面,所有消息必須消費完了,不能有殘留。很明顯,這個很難!!!

接收端

對於接收端,不能並行消費,也即不能開多線程或者多個客戶端消費同1個隊列。

總結

從上面的分析可以看出,要保證消息的嚴格有序,有多么困難!

發送端和接收端的問題,還好解決一點,限制異步發送,限制並行消費。但對於存儲端,機器掛了之后,切換的問題,就很難解決了。

你切換了,可能消息就會亂;你不切換,那就暫時不可用。這2者之間,就需要權衡了。

業務需要全局有序嗎?

通過上面分析可以看出,要保證一個topic內部,消息嚴格的有序,是很困難的,或者說條件是很苛刻的。

那怎么辦呢?我們一定要使出所有力氣、用盡所有辦法,來保證消息的嚴格有序嗎?

這里就需要從另外一個角度去考慮這個問題:業務角度。正如在下面這篇博客中所說的: 
http://www.jianshu.com/p/453c6e7ff81c

實際情況中: 
(1)不關注順序的業務大量存在; 
(2) 隊列無序不代表消息無序。

第(2)條的意思是說:我們不保證隊列的全局有序,但可以保證消息的局部有序。

舉個例子:保證來自同1個order id的消息,是有序的!

下面就看一下在Kafka和RocketMQ中,分別是如何對待這個問題的:

Kafka中:發送1條消息的時候,可以指定(topic, partition, key) 3個參數。partiton和key是可選的。

如果你指定了partition,那就是所有消息發往同1個partition,就是有序的。並且在消費端,Kafka保證,1個partition只能被1個consumer消費。

或者你指定key(比如order id),具有同1個key的所有消息,會發往同1個partition。也是有序的。

RocketMQ: RocketMQ在Kafka的基礎上,把這個限制更放寬了一步。只指定(topic, key),不指定具體發往哪個隊列。也就是說,它更加不希望業務方,非要去要一個全局的嚴格有序。

 

 

消費模式

一、集群消費

之前的博客中,啟動的都是單個Consumer,如果啟動多個呢?

 

RocketMQ-集群消費 
RocketMQ-集群消費

 

其實,對於RocketMQ而言,通過ConsumeGroup的機制,實現了天然的消息負載均衡!通俗點來說,RocketMQ中的消息通過ConsumeGroup實現了將消息分發到C1/C2/C3/……的機制,這意味着我們將非常方便的通過加機器來實現水平擴展!

我們考慮一下這種情況:比如C2發生了重啟,一條消息發往C3進行消費,但是這條消息的處理需要0.1S,而此時C2剛好完成重啟,那么C2是否可能會收到這條消息呢?答案是肯定的,也就是consume broker的重啟,或者水平擴容,或者不遵守先訂閱后生產消息,都可能導致消息的重復消費!關於去重的話題會在后續中予以介紹!

至於消息分發到C1/C2/C3,其實也是可以設置策略的: 

RocketMQ-消息負載策略 
RocketMQ-消息負載策略

 

使用哪種策略,只需要實例化對應的對象即可,如:

AllocateMessageQueueStrategy aqs = new AllocateMessageQueueAveragelyByCircle(); consumer.setAllocateMessageQueueStrategy(aqs);

上面內容,其實是一種消費模式——集群消費。 
RocketMQ的消費模式有2種,查看一下源碼:

public enum MessageModel { /** * broadcast */ BROADCASTING, /** * clustering */ CLUSTERING; }

 

在默認情況下,就是集群消費(CLUSTERING),也就是上面提及的消息的負載均衡消費。另一種消費模式,是廣播消費(BROADCASTING)。


二、廣播消費

廣播消費,類似於ActiveMQ中的發布訂閱模式,消息會發給Consume Group中的每一個消費者進行消費。 

RocketMQ-廣播消費模式設置 
RocketMQ-廣播消費模式設置

 

/** * Consumer,訂閱消息 */ public class Consumer2 { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_name"); consumer.setNamesrvAddr("192.168.2.222:9876;192.168.2.223:9876"); consumer.setConsumeMessageBatchMaxSize(10); // 設置為廣播消費模式 consumer.setMessageModel(MessageModel.BROADCASTING); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicTest", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { try { for (MessageExt msg : msgs) { System.out.println(" Receive New Messages: " + msg); } } catch (Exception e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; // 重試 } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 成功 } }); consumer.start(); System.out.println("Consumer Started."); } }

 


內容補充

《RocketMQ(三)——HelloWorld》那篇博客的最后提到了單批次消息消費數量 ,本文既然提到了集群消費,那就針對這兩個內容再進行一次補充吧。 
如果我們有2台節點,Producerw往MQ上寫入20條數據 其中Consumer1中拉取了12條 。Consumer2中拉取了8 條,這種情況下,假如Consumer1宕機,那么我們消費數據的時候,只能消費到Consumer2中的8條,Consumer1中的12條已經持久化了。需要Consumer1恢復之后這12條數據才能繼續被消費。其實這種先啟動producer往MQ上寫數據,然后再啟動Consumer的情況本來就是違規操作,正確的情況應該是先啟動Consumer后再啟動producer。

 

 

集群消費和廣播消費

基本概念

MQ 是基於發布訂閱模型的消息系統。在 MQ 消息系統中消息的訂閱方訂閱關注的 Topic,以獲取並消費消息。由於訂閱方應用一般是分布式系統,以集群方式部署有多台機器。因此 MQ 約定以下概念。

集群:MQ 約定使用相同 Consumer ID 的訂閱者屬於同一個集群,同一個集群下的訂閱者消費邏輯必須完全一致(包括 Tag 的使用),這些訂閱者在邏輯上可以認為是一個消費節點。

集群消費:當使用集群消費模式時,MQ 認為任意一條消息只需要被集群內的任意一個消費者處理即可。

廣播消費:當使用廣播消費模式時,MQ 會將每條消息推送給集群內所有注冊過的客戶端,保證消息至少被每台機器消費一次。

場景對比

集群消費模式:

cl

適用場景&注意事項

  • 消費端集群化部署,每條消息只需要被處理一次。
  • 由於消費進度在服務端維護,可靠性更高。
  • 集群消費模式下,每一條消息都只會被分發到一台機器上處理,如果需要被集群下的每一台機器都處理,請使用廣播模式。
  • 集群消費模式下,不保證消息的每一次失敗重投等邏輯都能路由到同一台機器上,因此處理消息時不應該做任何確定性假設。

廣播消費模式:

bd

適用場景&注意事項

  • 每條消息都需要被相同邏輯的多台機器處理。
  • 消費進度在客戶端維護,出現重復的概率稍大於集群模式。
  • 廣播模式下,MQ 保證每條消息至少被每台客戶端消費一次,但是並不會對消費失敗的消息進行失敗重投,因此業務方需要關注消費失敗的情況。
  • 廣播模式下,第一次啟動時默認從最新消息消費,客戶端的消費進度是被持久化在客戶端本地的隱藏文件中,因此不建議刪除該隱藏文件,否則會丟失部分消息。
  • 廣播模式下,每條消息都會被大量的客戶端重復處理,因此推薦盡可能使用集群模式。
  • 目前僅 Java 客戶端支持廣播模式。
  • 廣播模式下服務端不維護消費進度,所以服務端不提供堆積查詢和報警功能。

使用集群模式模擬廣播:

cd-bd

適用場景&注意事項

    • 每條消息都需要被多台機器處理,每台機器的邏輯可以相同也可以不一樣。
    • 消費進度在服務端維護,可靠性高於廣播模式。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM