分布式開放消息系統(RocketMQ)的原理與實踐


轉載:https://www.jianshu.com/p/453c6e7ff81c

 

分布式消息系統作為實現分布式系統可擴展、可伸縮的關鍵組件,需要具有高吞吐、高可用等特點,而談到消息系統的設計,就回避不了兩個問題:

1.消息的順序問題
2.消息的重復問題

RockerMQ作為阿里開源的一款高性能、高吞吐的消息中間件,它是怎樣來解決這兩的問題的?

RocketMQ有哪些關鍵特性?

其實現原理是怎樣的?

 

【關鍵特性及其實現原理】

【一、順序消費】

消息有序指的是可以按照消息的發送順序來消費。

例如:一筆訂單產生了3條消息,分別是訂單創建、訂單付款、訂單完成。消費時,必須按照順序消費才有意義,與此同時多筆訂單之間又是可以並行消費的。

例如生產者差生了2條消息:M1、M2,要保證這兩條消息的順序,應該怎樣做?可能腦中想到的是這樣的:

假定M1-->S1,M2-->S2,如果要保證M1比M2想消費,那么M1到達消費端被消費后,通知S2,然后S2再將M2發送到消費端。

但是這個模型存在的問題是:如果M1和M2分別發送到兩台Server上,就不能保證M1先到達MQ集群,也不能保證M1被先消費。換個角度看,如果M2先與M1到達MQ集群,甚至M2被消費后,M1才到達消費端,這時候消息就亂序了,說明以上模型是不能保證消息的順序的。

如何才能在MQ集群保證消息的順序?一種簡單的方式就是將M1、M2發送到同一個Server上:

上面這樣可以保證M1先與M2達到MQ集群(Producer等待M1發送成功后再發送M2),根據先到達先被消費的原則,M1會先於M2被消費,這樣就保證了消息的順序。

但是這個模型也僅僅是在理論上可以保證消息的順序,在實際場景中可能會遇到下面的問題:網絡延遲問題

只要將消息從一台服務器發往另一台服務器,就會存在網絡延遲問題,如上圖所示,如果發送M1耗時大於發送M2耗時,那么仍然存在可能M2被先消費,仍然不能保證消息的順序,即使M1和M2同時到達消費端,由於不清楚消費端1和消費端2的負載情況,仍然可能出現M2先於M1被消費的情況。

那如何解決這個問題呢?

將M1和M2發往同一個消費者,且發送M1后,需要消費端響應成功后才能發送M2。

但是這里又會存在另外的問題:如果M1被發送到消費端后,消費端1沒有響應,那么是繼續發送M2呢,還是重新發送M1?一般來說為了保證消息一定被消費,肯定會選擇重發M1到另外一個消費端2,如下圖,保證消息順序的正確方式:

這樣的模型就可以嚴格保證消息的順序,但是仍然會有問題:消費端1沒有響應Server時,有兩種情況,一種是M1確實沒有到達(數據可能在網絡傳輸中丟失),另一種是消費端已經消費M1並且已經發回響應消息,但是MQ Server沒有收到。如果是第二種情況,會導致M1被重復消費

回過頭來看消息順序消費問題,嚴格的順序消息非常容易理解,也可以通過文中所描述的方式來簡化處理,總結起來,要實現嚴格的順序消息,簡單可行的辦法就是:

保證 生產者—MQServer—消費者  是“一對一對一”的關系。

這樣的設計雖然簡單易行,但是也存在一些很嚴重的問題,比如:

1.並行度會成為消息系統的瓶頸(吞吐量不夠)
2.產生更多的異常處理。比如:只要消費端出現問題,就會導致整個處理流程阻塞,我們不得不花費更多的精力來解決阻塞的問題。

我們最終的目標是要集群的高容錯性和高吞吐量,這似乎是一對不可調和的矛盾,那么阿里是如何解決的呢?

世界上解決一個計算機問題最簡單的方法:“恰好”不需要解決它!—— 沈詢

有些問題,看起來很重要,但實際上我們可以通過合理的設計將問題分解來規避,如果硬要把時間花在解決問題本身,實際上不僅效率低下,而且也是一種浪費。從這個角度來看消息的順序問題,可以得出兩個結論:

1.不關注亂序的應用大量存在
2.隊列無序並不意味着消息無序

所以從業務層面來保證消息的順序,而不僅僅是依賴於消息系統,是不是我們更應該尋求的一種合理的方式?

最后從源碼角度分析RocketMQ怎么實現發送順序消息。

RocketMQ通過輪詢所有隊列的方式來確定消息被發送到哪一個隊列(負載均衡策略)。

比如下面的例子中,訂單號相同的消息揮別先后發送到同一個隊列中:

// RocketMQ通過MessageQueueSelector中實現的算法來確定消息發送到哪一個隊列上
// RocketMQ默認提供了兩種MessageQueueSelector實現:隨機/Hash
// 當然你可以根據業務實現自己的MessageQueueSelector來決定消息按照何種策略發送到消息隊列中
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        Integer id = (Integer) arg;
        int index = id % mqs.size();
        return mqs.get(index);
    }
}, orderId);

 在獲取到路由信息以后,會根據MessageQueueSelector實現的算法來選擇一個隊列,同一個OrderId獲取到的肯定是同一個隊列。

private SendResult send()  {
    // 獲取topic路由信息
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    if (topicPublishInfo != null && topicPublishInfo.ok()) {
        MessageQueue mq = null;
        // 根據我們的算法,選擇一個發送隊列
        // 這里的arg = orderId
        mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg);
        if (mq != null) {
            return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, timeout);
        }
    }
}

【二、消息重復】

造成消息重復的根本原因是:網絡不可達。

只要通過網絡交換數據,就無法避免這個問題。所以解決這個問題的辦法是繞過這個問題。

那么問題就變成了:如果消費端收到兩條一樣的消息,應該怎樣處理?

1.消費端處理消息的業務邏輯要保持冪等性。
2.保證每條數據都有唯一編號,且保證消息處理成功與去重表的日志同時出現。

第1條很好理解,只要保持冪等性,不管來多少條重復消息,最后處理的結果都一樣。

第2條原理就是利用一張日志表來記錄已經處理成功的消息的ID,如果新到的消息ID已經在日志表中,那么久不在處理這條消息。

第1條解決方案,很明顯應該在消費端實現,不屬於消息系統要實現的功能。

第2條可以由消息系統實現,也可以由業務端實現。正常情況下出現重復消息的概率其實很小,如果由消息系統來實現的話,肯定會對消息系統的吞吐量和高可用有影響,所以最好還是由業務端自己處理消息重復的問題,這也是RocketMQ不解決消息重復問題的原因。

RocketMQ不保證消息不重復,如果你的業務系統需要保證嚴格的不重復消息,需要你自己在業務端去重。

【三、事務消息】

RocketMQ除了支持普通消息,順序消息,另外還支持事務消息。首先討論一下什么是事務消息以及支持事務消息的必要性。我們以一個轉賬的場景為例來說明這個問題:Bob向Smith轉賬100元。

在單機環境下,執行事務的情況大概是下面這個樣子:(單機環境下轉賬事務示意圖)

當用戶增長到一定的程度,Bob和Smith各自的賬戶和余額信息不再同一台服務器上了,那么上面的流程就會變成這樣:(集群環境下轉賬事務示意圖)

這時候會發現,同樣的一個轉賬業務,在集群環境下,耗時會成倍地增長,這顯然是不能接受的,那么如何來規避這個問題?

大事務 = 小事務 + 異步

將大事務拆分成多個小事務異步執行。這樣基本上能夠將跨機事務的執行效率優化到與單機一致。

轉賬的事務可以分解成如下兩個小事務:(小事務+異步消息)

圖中執行本地事務(Bob賬戶扣款)和發送異步消息應該保證同步成功或者同步失敗,也就是扣款成功了,發送消息也一定要成功,如果扣款失敗了,就不能發送消息。問題來了,我們是先扣款還是先發送消息呢?

首先看下線發送消息的情況,大致的示意圖如下:(事務消息:先發送消息)

存在的問題是:如果消息發送成功,但是扣款失敗,消費端就會消費此消息,進而向Smith賬戶加錢。

先發消息不行,那就先扣款吧,大致的示意圖如下:(事務消息:先扣款)

存在的問題和上面類似:如果扣款成功,發送消息失敗,就會出現Bob扣錢了,但是Smith賬戶未加錢。

可能還會想到別的辦法來解決這個問題:直接將發消息放到Bob扣款的扣款的事務中去,如果發送失敗,就拋出異常,事務回滾。這樣的處理方式也符合“恰好”不需要解決的原則。

這里需要聲明一下:如果使用Spring來管理事務的話,大可以將發送消息的邏輯放到本地事務中去,發送消息失敗就拋出異常,Spring捕捉到異常后就會回滾此事務,以此來保證本地事務與發送消息的原子性。

RocketMQ支持事務消息,下面來看看RocketMQ是怎樣來實現發送事務消息的:

RocketMQ分三個階段:

第一階段發送Prepared消息時,會拿到消息的地址。

第二階段執行本地事務。

第三階段通過第一階段拿到的地址去訪問消息,並修改消息的狀態。

這里又會發現問題,如果確認消息發送失敗了怎么辦?RocketMQ會定期掃描消息集群中的事務消息,如果發現了Prepared消息,它會向Producer發送端確認,Bob的錢到底是減了還是沒減呢?如果減了是回滾還是確認消息呢?RocketMQ會根據發送端設置的策略來決定是回滾還是繼續發送確認消息。這樣就保證了消息發送與本地事務同時成功或同時失敗。

我們來看下RocketMQ的源碼,是如何處理事務消息的。Producer發送事務消息的部分。(完整的代碼請查看:rocketmq-example工程下的com.alibaba.rocketmq.example.transaction.TransactionProducer)

// =============================發送事務消息的一系列准備工作========================================
// 未決事務,MQ服務器回查客戶端
// 也就是上文所說的,當RocketMQ發現`Prepared消息`時,會根據這個Listener實現的策略來決斷事務
TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();
// 構造事務消息的生產者 TransactionMQProducer producer = new TransactionMQProducer("groupName"); // 設置事務決斷處理類 producer.setTransactionCheckListener(transactionCheckListener); // 本地事務的處理邏輯,相當於示例中檢查Bob賬戶並扣錢的邏輯 TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl(); producer.start() // 構造MSG,省略構造參數 Message msg = new Message(......); // 發送消息 SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null); producer.shutdown();

接着查看sendMessageInTransaction方法的源碼,總共分為三個階段:

1.發送Prepared消息

2.執行本地事務

3.發送確認消息

//  ================================事務消息的發送過程=============================================
public TransactionSendResult sendMessageInTransaction(.....)  {
    // 邏輯代碼,非實際代碼
    // 1.發送消息
    sendResult = this.send(msg);
    // sendResult.getSendStatus() == SEND_OK
    // 2.如果消息發送成功,處理與消息關聯的本地事務單元 if(sendResult.getStatus==SEND_OK)
    LocalTransactionState localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg);
    // 3.結束事務
    this.endTransaction(sendResult, localTransactionState, localException);
}

endTransaction()方法會將請求發往broker(mq server)去更新事務消息的最終狀態,如下:

1.根據sendResult找到Prepared消息,sendResult包含事務消息的ID。

2.根據location更新消息的最終狀態。

如果endTransaction()方法執行失敗,數據沒有發送發到broker,導致事務消息的狀態更新失敗,broker會有回查線程定時(默認1分鍾)掃描每個事務狀態的表格文件,如果已經提交或者回滾消息則直接跳過,如果是prepared狀態的則會向Producer發起checkTransaction請求,Producer會調用DefaultMQProducerImpl.checkTransactionState()方法來處理broker的定時回調請求,而checkTransactionState會調用我們的事務設置的決斷方法來決定是回滾事務還是繼續執行,最后調用endTransactionOneway讓broker來更新消息的最終狀態。

再回到轉賬的例子,如果Bob的賬戶余額已經減少,且消息發送成功,Smith端開始消費這條消息,這個時候回出現消費失敗和消費超時兩個問題,解決超時問題的思路就是一直重試,直到消費端消費消息成功,真個過程可能會有重復消費的問題,按照上面的思路解決即可。

 

這樣基本上可以解決消費端超時的問題,但是消費失敗了怎么辦?愛麗緹提供的解決方法是:人工解決。我們可以考慮一下,按照事務的流程,因為某種原因Smith加款失敗,那么需要回滾整個流程。如果消息系統要實現這個回滾流程的話,系統的復雜度將大大提升,且很容易出現bug,估計出現Bug的概率會比消費失敗的概率大很多,這也是RocketMQ目前暫時沒有解決這個問題的原因。在設計實現消息系統時,我們需要衡量是否花這么大的代價來解決一個出現概率非常小的問題。 

 【四、Producer如何發送消息】

Producer輪詢某Topic下所有隊列的方式來實現發送方的負載均衡,如下圖所示:

RocketMQ的客戶端發送消息的源碼:

// 構造Producer
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
// 初始化Producer,整個應用生命周期內,只需要初始化1次
producer.start();
// 構造Message
Message msg = new Message("TopicTest1",// topic
                        "TagA",        // tag:給消息打標簽,用於區分一類消息,可為null
                        "OrderID188",  // key:自定義Key,可以用於去重,可為null
                        ("Hello MetaQ").getBytes());// body:消息內容
// 發送消息並返回結果
SendResult sendResult = producer.send(msg);
// 清理資源,關閉網絡連接,注銷自己
producer.shutdown();

在整個生命周期內,生產者需要調用一次start方法來初始化,初始化主要完成的任務有:

1.如果沒有指定namesrv地址,將會自動尋址
2.啟動定時任務:更新namesrv地址、從namesrv更新Topic路由信息、清理已經掛掉的broker、向所有的broker發送心跳...
3.啟動負載均衡的服務

初始化完成后,開始發送消息,發送消息的主要代碼如下:

private SendResult sendDefaultImpl(Message msg,......) {
    // 檢查Producer的狀態是否是RUNNING
    this.makeSureStateOK();
    // 檢查msg是否合法:是否為null、topic,body是否為空、body是否超長
    Validators.checkMessage(msg, this.defaultMQProducer);
    // 獲取topic路由信息
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    // 從路由信息中選擇一個消息隊列
    MessageQueue mq = topicPublishInfo.selectOneMessageQueue(lastBrokerName);
    // 將消息發送到該隊列上去
    sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, timeout);
}

代碼中需要關注兩個方法tryToFindTopicPublishInfoselectOneMessageQueue

前面說過在Producer初始化時,會啟動定時任務獲取路由信息並更新到本地緩存,所以tryToFindTopicPublishInfo會首先從緩存中獲取Topic路由信息,如果沒有獲取到,則會自己去namesrv獲取路由信息,selectOneMessageQueue方法通過輪詢的方式,返回一個隊列,以達到負載的目的。

如果Producer發送消息失敗,會自動重試,重試的策略:

1.重試次數<retryTimesWhenSendFailed(可配置)

2.總的耗時(包含重試n次的耗時)<sendMsgTimeout(發送消息傳入的參數)

3.同時滿足上面兩個條件后,Producer會選擇另外一個隊列發送消息。

【五、消息存儲】

RocketMQ的消息存儲是由comsume queuecimmit log配合完成的。

[ 1.Consume Queue ]

consume queue是消息的邏輯隊列,相當於字典的目錄,用來指定消息在物理文件commit log上的位置。

我們可以在配置中指定consumequeue與commitlog存儲的目錄。

每個Topic下的每個queue都有一個對應的consumequeue文件。比如

${rocketmq.home}/store/consumequeue/${topicName}/${queueId}/${fileName}

Consume Queue的文件組織,如下:

1.根據Topic和queueId來組織文件。圖中TopicA有兩個隊列0,1。那么TopicA和QueueId=0組成一個ConsumeQueue,TopicA和QueueId=1又組成另一個ConsumeQueue。

2.按照消費端的GroupName來分組重試隊列,如果消費端消費失敗,消息將被發往重試隊列中,比如圖中的%RETRY%ConsumerGroupA

3.按照消費端的GroupName來分組死信隊列,如果消費端消費失敗,並重試指定次數后,仍然失敗,則發往死信隊列,如圖中的%DLQ%ConsumerGroupA。

死信隊列(Dead Letter Queue)
一般用於存放某種原因無法傳遞的信息,比如處理是啊比或者已經過期的消息。

Consume Queue中存儲單元是一個20字節定長的二進制數據,順序寫順序讀,如下圖所示:(Consume Queue文件存儲單元格式)

1.CommitLog Offset是指這條消息在Commit Log 文件中的實際偏移量。

2.Size 存儲中消息的大小。

3.Message Tag HashCode存儲消息的Tag的哈希值:主要用於訂閱時消息過濾(訂閱時如果指定了Tag,會根據HashCode來快速查找到訂閱的消息)

[2. Commit Log] 

CommitLog:消息存放的物理地址,每台broker上的Commitlog被本機所有的queue共享,不做任何區分。

文件的默認位置如下:

${user.home}\store\${commitlog}\${fileName}

CommitLog的消息存儲單元長度不固定,文件順序寫,隨機讀。消息的存儲結構如下表所示,按照消息編號順序以及編號對應的內容依次存儲。(CommitLog存儲單元結構圖)

[3.消息存儲實現]

消息存儲實現,比較復雜,值得深入了解,以代碼說明一下具體的流程。

// Set the storage time
msg.setStoreTimestamp(System.currentTimeMillis());
// Set the message body BODY CRC (consider the most appropriate setting
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
synchronized (this) {
    long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
    // Here settings are stored timestamp, in order to ensure an orderly global
    msg.setStoreTimestamp(beginLockTimestamp);
    // MapedFile:操作物理文件在內存中的映射以及將內存數據持久化到物理文件中
    MapedFile mapedFile = this.mapedFileQueue.getLastMapedFile();
    // 將Message追加到文件commitlog
    result = mapedFile.appendMessage(msg, this.appendMessageCallback);
    switch (result.getStatus()) {
    case PUT_OK:break;
    case END_OF_FILE:
         // Create a new file, re-write the message
         mapedFile = this.mapedFileQueue.getLastMapedFile();
         result = mapedFile.appendMessage(msg, this.appendMessageCallback);
     break;
     DispatchRequest dispatchRequest = new DispatchRequest(
                topic,// 1
                queueId,// 2
                result.getWroteOffset(),// 3
                result.getWroteBytes(),// 4
                tagsCode,// 5
                msg.getStoreTimestamp(),// 6
                result.getLogicsOffset(),// 7
                msg.getKeys(),// 8
                /**
                 * Transaction
                 */
                msg.getSysFlag(),// 9
                msg.getPreparedTransactionOffset());// 10
    // 1.分發消息位置到ConsumeQueue
    // 2.分發到IndexService建立索引
    this.defaultMessageStore.putDispatchRequest(dispatchRequest);
}

[4.消息的索引文件]

如果一個消息包含key值的話,會使用IndexFile存儲消息索引,文件的內容結構如圖:(消息索引)

索引文件主要是用於根據key來查詢消息的,流程主要是:

1.根據查詢的key的hahscode%slotNum得到具體的槽的位置。(slotNum是一個索引文件里面包含的最大槽的數目,例如圖中所示slotNum=500w)

2.根據slotValue(slot位置對應的值)查找到索引項列表的最后一項(倒序排列,slotValue總是指向罪行的一個索引項)

3.遍歷索引項列表返回查詢時間范圍內的結果集(默認一次最大返回的32條記錄)

 

【六、消息訂閱】

RocketMQ消息訂閱有兩種模式:

一種是push模式,即MQServer主動向消費端推送。

另一種是Pull模式,即消費端在需要時,主動到MQServer拉取。

但是再具體實現時,Push和Pull模式都是采用消費端主動拉取的方式。

首先看下消費端的負載均衡:

消費端會通過RebalanceService線程,10s做一次基於Topic下的所有隊列負載:

1.遍歷Consumer下所有的Topic,然后根據Topic訂閱所有的消息
2.獲取同一Topic和Consume Group下的所有Consumer
3.然后根據具體的分配策略來分配消費隊列,分配的策略包含:平均分配、消費端配置等

如上圖所示,如果有5個隊列,2個Consumer,那么第一個Consumer消費3個隊列,第二個Consumer消費2個隊列。這里采用的就是平均分配策略。它類似於分頁的過程,Topic下面所有的Queue就是記錄,Consumer的個數就相當於總的頁數,那么每頁有多少條記錄,就類似於Consumer會消費哪些隊列。

通過這樣的策略來達到大體上的平均消費,這樣的設計也可以很方便地水平擴展來提高Consumer的消費能力。

消費端的Push模式是通過長輪詢的模式來實現的,就如同下圖:(Push模式示意圖)

Consumer端每隔一段時間主動向broker發送拉消息請求,broker在收到Pull請求后,如果有消息就立即返回數據,Consumer端收到返回的消息后,再回調消費者設置的Listener方法。如果broker在收到Pull請求時,消息隊列里沒有數據,broker端會阻塞請求指導有數據傳遞或超時才返回。

當然,Consumer端是通過一個線程將阻塞隊列LinkBlockingQueue<PullRequest>中的PullRequest發送到broker拉取消息,以防止Consumer一直被阻塞。而Broker端,在接收到Consumer的PullRequest時,如果發現沒有消息,就會把PullRequest扔到ConcurrentHashMap中緩存起來。broker在啟動時,會啟動一個線程不停地從ConcurrentHashMap取出PullRequest檢查,直到有數據返回。

【七、RocketMQ的其他特性】

前面的6個特性都是基本上點到為止,想要深入了解,還需要多多查看源碼,多多在實際中運用歐冠。除了上面提到的特性,RocketMQ還支持:

1.定時消息

2.消息的刷盤策略

3.主動同步策略:同步雙寫、異步復制

4.海量消息堆積能力

5.高效通信

......

其中涉及到的很多設計思路和解決方法都值得深入研究:

1.消息的存儲設計:既要滿足海量消息的堆積能力,又要滿足極快的查詢效率,還要保證寫入的效率。

2.高效的通信組件設計:高吞吐量,毫秒級的消息投遞能力都離不開高效的通信。

.....

【Rocket的最佳實踐】

【一、Producer最佳實踐】

1.一個應用盡可能用一個Topic,消息子類型用tags來標識,tags可以由應用自由設置。只有發送消息設置了tags,消費方在訂閱消息時,才可以利用tags在broker做消息過濾。

2.每個消息在業務層面的唯一標識碼,要設置到keys字段,方便將來定位消息丟失問題。由於是哈希索引,請務必保證key盡可能唯一,這樣可以避免潛在的哈希沖突。

3.消息發送成功或者失敗,要打印消息日志,務必打印sendResult和key字段

4.對於消息不可丟失應用,務必要有消息重發機制。例如:消息發送失敗,存儲到數據庫,能有定時程序嘗試重發或者人工觸發重發。

5.某些應用如果不關注消息是否發送成功,請直接使用sendOneWay方法發送消息。

【二、Consumer最佳實踐】

1.消費過程要做到冪等

2.盡量使用批量方式消費,可以很大程度上提高消費吞吐量。

3.優化每條消息的消費過程

【三、其他配置】

線上應該關閉autoCreateTopicEnable,即在配置文件中將其設置為false。

RocketMQ在發送消息時,會首先獲取路由信息。如果是新的消息,由於MQServer上面還沒有創建對應的Topic,這個時候,如果上面的配置打開的話(autoCreateTopicEnable=true),會返回默認Topic的路由信息(RocketMQ會在每台Broker上面創建名為TBW102的Topic),然后Producer會選擇一台Broker發送消息,選中的Broker在存儲消息時,發現消息的Topic還沒有創建,就會自動創建Topic。后果就是:以后所有該Topic的消息,都將發送到這台Broker上,達不到負載均衡的目的。

所以基於目前RocketMQ的設計,建議關閉自動創建Topic的功能,然后根據消息量的大小,手動創建Topic。

【RocketMQ設計相關】

RocketMQ的設計假定:

每台PC機器都可能宕機不可服務
任意集群都有可能處理能力不足
最壞的情況一定會發生
內網需要低延遲來提供最佳用戶體驗

RocketMQ的關鍵設計

分布式集群化
強數據安全
海量數據堆積
毫秒級投遞延遲(推拉模式)

這是RocketMQ在設計時的假定前提以及需要到達的效果。我想這些假定適用於所有的系統設計。隨着我們系統的服務的增多,每位開發者都要注意自己的程序是否存在單點故障,如果掛了應該怎么恢復、能不能很好的水平擴展、對外的接口是否足夠高效、自己管理的數據是否足夠安全...... 多多規范自己的設計,才能開發出高效健壯的程序。

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM