RocketMQ問題


1.為什么要用消息中間件:
1.解耦:   系統A只負責把數據寫到隊列中,誰想要或不想要這個數據(消息),系統A一點都不關心。
      即便現在系統D不想要userId這個數據了,系統B又突然想要userId這個數據了,都跟系統A無關,系統A一點代碼都不用改。
      系統D拿userId不再經過系統A,而是從消息隊列里邊拿。系統D即便掛了或者請求超時,都跟系統A無關,只跟消息隊列有關。
 
2.異步:  假設系統A運算出userId具體的值需要50ms,調用系統B的接口需要300ms,調用系統C的接口需要300ms,調用系統D的接口需要300ms。那么這次請求就需要50+300+300+300=950ms
      並且我們得知,系統A做的是主要的業務,而系統B、C、D是非主要的業務。比如系統A處理的是訂單下單,而系統B是訂單下單成功了,那發送一條短信告訴具體的用戶此訂單已成功,而系統C和系統D也是處理一些小事而已。
3.削峰/限流:  系統B和系統C根據自己的能夠處理的請求數去消息隊列中拿數據,這樣即便有每秒有8000個請求,那只是把請求放在消息隊列中,去拿消息隊列的消息由系統自己去控制,這樣就不會把整個系統給搞崩。

2.RocketMQ由哪些角色組成,每個角色作用和特點是什么?

  RocketMQ主要由 Producer、Broker、Consume組成。Producer 生產消息,Consumer 消費消息,Broker 存儲消息。
    1、消息生產者(producer):負責生產消息,一般由業務系統負責生產消息。一個消息生產者會把業務應用系統里產 生的消息發送到broker服務器。RocketMQ提供多種發送方式,同步發送、異步發送、順 序發送、單向發送。同步和異步方式均需要Broker返回確認信息,單向發送不需要。
    2、消息消費者(Consumer):負責消費消息,一般是后台系統負責異步消費。一個消息消費者會從Broker服務器拉 取消息、並將其提供給應用程序。從用戶應用的角度而言提供了兩種消費形式:拉取式消費 (pull consumer)、推動式消費(push consumer)。
    3、代理服務器(Broker Server):消息中轉角色,負責存儲消息、轉發消息。代理服務器在RocketMQ系統中負責接收 從生產者發送來的消息並存儲、同時為消費者的拉取請求作准備。代理服務器也存儲消息相關的元數據,包括消費者組、消費進度偏移和主題和隊列消息等。
 

RocketMQ從消費發送到消費的執行流程

  1. Producer發送消息到Broker,負載均衡策略默認隨機
  2. Broker接收消息,寫入PageCage,返回成功
  3. Broker刷盤,消息存儲Consumer queue、commit log
  4. Consumer從Broker拉取消息,拉取方式長輪詢pull
  5. Consumer消費消息,處理業務邏輯
  6. Consumer返回ACK,更新Broker offset
  7. 消費失敗,消息轉入失敗隊列
  8. Broker的ScheduleService從重試隊列拉取消息,重放這個消息
  9. 重試16次如果還是失敗,消息進入死信隊列
  10. 通過RocketMQ操作面板監控死信隊列,手動處理

3.RocketMQ Broker中的消息被消費后會立即刪除嗎?

    不會,每條消息都會持久化到CommitLog中,每個consumer連接到broker后會維持消費進度信息,當有消息消費后只是當前consumer的消費進度(CommitLog的offset)更新了。

  那么消息會堆積嗎?什么時候清理過期消息?

    4.6版本默認48小時后會刪除不再使用的CommitLog文件。

      - 檢查這個文件最后訪問時間
      - 判斷是否大於過期時間
      - 指定時間刪除,默認凌晨4點

RocketMQ消費模式有幾種?

  消費模型由consumer決定,消費維度為Topic

集群消費,廣播消費

  集群消費: 一組consumer同時消費一個topic,可以分配消費負載均衡策略分配consumer對應消費topic下的哪些queue

        多個group同時消費一個topic時,每個group都會消費到數據

        一條消息只會被一個group中的consumer消費,

  廣播消費: 消息將對一 個Consumer Group 下的各個 Consumer 實例都消費一遍。即使這些 Consumer 屬於同一個Consumer Group ,消息也會被 Consumer Group 中的每個 Consumer 都消費一次。

順序消息缺陷

發送順序消息無法利用集群Fail Over特性消費順序消息的並行度依賴於隊列數量隊列熱點問題,個別隊列由於哈希不均導致消息過多,消費速度跟不上,產生消息堆積問題遇到消息失敗的消息,無法跳過,當前隊列消費暫停。

消費消息時使用的是push還是pull?

  在剛開始的時候就要決定使用哪種方式消費。兩種(都實現了MQConsumerInner接口):

         `DefaultLitePullConsumerImpl` 拉

         `DefaultMQPushConsumerImpl`推

  名稱上看起來是一個推,一個拉,但實際底層實現都是采用的**長輪詢機制**,即拉取方式broker端屬性 longPollingEnable 標記是否開啟長輪詢。默認開啟。

為什么要主動拉取消息而不使用事件監聽方式?

  事件驅動方式是建立好長連接,由事件(發送數據)的方式來實時推送。

  如果broker主動推送消息的話有可能push速度快,消費速度慢的情況,那么就會造成消息在consumer端堆積過多,同時又不能被其他consumer消費的情況。

NameServer實現原理:

  NameServer是一個非常簡單的Topic路由注冊中心,其角色類似Dubbo中的zookeeper,支持Broker的動態注冊與發現。主要包括兩個功能

    Broker管理,NameServer接受Broker集群的注冊信息並且保存下來作為路由信息的基本數據。然后提供心跳檢測機制,檢查Broker是否還存活;
路由信息管理,每個NameServer將保存關於Broker集群的整個路由信息和用於客戶端查詢的隊列信息。然后Producer和Conumser通過NameServer就可以知道整個Broker集群的路由信息,從而進行消息的投遞和消費
    NameServer通常也是集群的方式部署,各實例間相互不進行信息通訊。Broker是向每一台NameServer注冊自己的路由信息,所以每一個NameServer實例上面都保存一份完整的路由信息。當某個NameServer因某種原因下線了,Broker仍然可以向其它NameServer同步其路由信息,Producer,Consumer仍然可以動態感知Broker的路由的信息

  NameServer實例之間互不通信,這本身也是其設計亮點之一,即允許不同NameServer之間數據不同步(像Zookeeper那樣保證各節點數據強一致性會帶來額外的性能消耗)

RocketMQ集群模式

  單Master模式:只有一個 Master節點

​     優點:配置簡單,方便部署

​     缺點:這種方式風險較大,一旦Broker重啟或者宕機時,會導致整個服務不可用,不建議線上環境使用

  多Master模式: 一個集群無 Slave,全是 Master,例如 2 個 Master 或者 3 個 Master

​     優點:配置簡單,單個Master 宕機或重啟維護對應用無影響,在磁盤配置為RAID10 時,即使機器宕機不可恢復情況下,由與 RAID10磁盤非常可靠,消息也不會丟(異步刷盤丟失少量消息,同步刷盤一條不丟)。性能最高。多 Master 多 Slave 模式,異步復制

​     缺點:單台機器宕機期間,這台機器上未被消費的消息在機器恢復之前不可訂閱,消息實時性會受到受到影響

  多Master多Slave模式(異步復制):每個 Master 配置一個 Slave,有多對Master-Slave, HA,采用異步復制方式,主備有短暫消息延遲,毫秒級。

​     優點:即使磁盤損壞,消息丟失的非常少,且消息實時性不會受影響,因為Master 宕機后,消費者仍然可以從 Slave消費,此過程對應用透明。不需要人工干預。性能同多 Master 模式幾乎一樣。

​     缺點: Master 宕機,磁盤損壞情況,會丟失少量消息。

  多Master多Slave模式(同步雙寫):每個 Master 配置一個 Slave,有多對Master-Slave, HA采用同步雙寫方式,主備都寫成功,向應用返回成功。

     優點:數據與服務都無單點, Master宕機情況下,消息無延遲,服務可用性與數據可用性都非常高

​     缺點:性能比異步復制模式略低,大約低 10%左右,發送單個消息的 RT會略高。目前主宕機后,備機不能自動切換為主機。

broker如何處理拉取請求的?

  consumer首次請求broker,broker中是否有符合條件的消息

   有 -> 響應consumer,等待下次consumer的請求
   沒有->掛起consumer的請求,即不斷開連接,也不返回數據
   掛起時間長短,長輪詢寫死,短輪詢可以配
  使用consumer的offset,DefaultMessageStore#ReputMessageService#run方法:每隔1ms檢查commitLog中是否有新消息,有的話寫入到pullRequestTable,當有新消息的時候返回請求,PullRequestHoldService 來Hold連接,每個5s執行一次檢查pullRequestTable有沒有消息,有的話立即推送。

 

RocketMQ如何做負載均衡?

通過Topic在多broker種分布式存儲實現。

producer端

  發送端指定Target message queue發送消息到相應的broker,來達到寫入時的負載均衡

    - 提升寫入吞吐量,當多個producer同時向一個broker寫入數據的時候,性能會下降
    - 消息分布在多broker種,為負載消費做准備

  每 30 秒從 nameserver獲取 Topic 跟 Broker 的映射關系,近實時獲取最新數據存儲單元,queue落地在哪個broker中

  在使用api中send方法的時候,可以指定Target message queue寫入或者使用MessageQueueSelector

默認策略是隨機選擇:

  - producer維護一個index
  - 每次取節點會自增
  - index向所有broker個數取余
  - 自帶容錯策略

consumer端

客戶端完成負載均衡

  - 獲取集群其他節點
  - 當前節點消費哪些queue
  - **負載粒度直到Message Queue**
  - **consumer的數量最好和Message Queue的數量對等或者是倍數,不然可能會有消費傾斜**
  - 每個consumer通過**balanced**維護processQueueTable
  - processQueueTable為當前consumer的消費queue
  - processQueueTable中有 
    - ProcessQueue :維護消費進度,從broker中拉取回來的消息緩沖
    - MessageQueue : 用來定位查找queue

負載均衡算法

  平均分配策略(默認)(AllocateMessageQueueAveragely)
  環形分配策略(AllocateMessageQueueAveragelyByCircle)
  手動配置分配策略(AllocateMessageQueueByConfig)
  機房分配策略(AllocateMessageQueueByMachineRoom)
  一致性哈希分配策略(AllocateMessageQueueConsistentHash)
  靠近機房策略(AllocateMachineRoomNearby)

 MQ與DB一致性原理(兩方事務):

單個topic可以分布在多個broker上嗎?
  • 一個topic分布在多個broker上,一個broker可以配置多個topic,它們是多對多的關系。 
  •  如果某個topic消息量很大,應該給它多配置幾個隊列,並且盡量多分布在不同broker上,減輕某個broker的壓力。
  •  topic消息量都比較均勻的情況下,如果某個broker上的隊列越多,則該broker壓力越大。

RocketMQ如何做負載均衡

Topic在Broker集群中分布式存儲

Producer端:輪詢
Consumer端:平均分配策略,一個隊列最多被一個消費組的一個Consumer消費,一個Consumer可以消費多個隊列

 

重復消費的原因

  • Consumer消費完,宕機,未返回ACK
  • Consumer消費完,返回ACK,網絡斷開,Broker未收到
  • 主Broker更新ACK,副Broker未復制,主Broker宕機
 

消息被重復消費,如何保證(也就是說,在某個consumser已經消費過了,由於網絡波動或者其他原因導致的消息重復被消費)?

  冪等性,也就是訪問多次,結果不變,實現的方式有很多,通過flag去標記,或者保存數據庫的時候,將其中某個字段設置為主鍵,當再次被消費的時候,發現主鍵存在數據庫,則不再進行消費。   
 

broker與nameserver關系

  • 連接
  • 單個broker和所有nameserver保持長連接
  • 心跳
  •      心跳間隔:每隔30秒(此時間無法更改)向所有nameserver發送心跳,心跳包含了自身的topic配置信息。
  •      心跳超時:nameserver每隔10秒鍾(此時間無法更改),掃描所有還存活的broker連接,若某個連接2分鍾內(當前時間與最后更新時間差值超過2分鍾,此時間無法更改)沒有發送心跳數據,則斷開連接。
  •  斷開
  •      時機:broker掛掉;心跳超時導致nameserver主動關閉連接
  •      動作:一旦連接斷開,nameserver會立即感知,更新topc與隊列的對應關系,但不會通知生產者和消費者
可用性
   由於消息分布在各個broker上,一旦某個broker宕機,則該broker上的消息讀寫都會受到影響。所以rocketmq提供了master/slave的結構,salve定時從master同步數據,如果master宕機,則slave提供消費服務,但是不能寫入消息,此過程對應用透明,由rocketmq內部解決。
這里有兩個關鍵點:
  • 一旦某個broker master宕機,生產者和消費者多久才能發現?受限於rocketmq的網絡連接機制,默認情況下,最多需要30秒,但這個時間可由應用設定參數來縮短時間。這個時間段內,發往該broker的消息都是失敗的,而且該broker的消息無法消費,因為此時消費者不知道該broker已經掛掉。
  •  消費者得到master宕機通知后,轉向slave消費(重定向,對於2次開發者透明),但是slave不能保證master的消息100%都同步過來了,因此會有少量的消息丟失。但是消息最終不會丟的,一旦master恢復,未同步過去的消息會被消費掉。
靠性
  •  所有發往broker的消息,有同步刷盤和異步刷盤機制,總的來說,可靠性非常高
  •  同步刷盤時,消息寫入物理文件才會返回成功,因此非常可靠
  •  異步刷盤時,只有機器宕機,才會產生消息丟失,broker掛掉可能會發生,但是機器宕機崩潰是很少發生的,除非突然斷電
消息清理
  • 掃描間隔
  •      默認10秒,由broker配置參數cleanResourceInterval決定
  •  空間閾值
  •      物理文件不能無限制的一直存儲在磁盤,當磁盤空間達到閾值時,不再接受消息,broker打印出日志,消息發送失敗,閾值為固定值85%
  •  清理時機
  •      默認每天凌晨4點,由broker配置參數deleteWhen決定;或者磁盤空間達到閾值
  •  文件保留時長
  •      默認72小時,由broker配置參數fileReservedTime決定
消費者與nameserver關系
  •   連接
  •      單個消費者和一台nameserver保持長連接,定時查詢topic配置信息,如果該nameserver掛掉,消費者會自動連接下一個nameserver,直到有可用連接為止,並能自動重連。
  • 心跳
  • 與nameserver沒有心跳
  •  輪詢時間
  • 默認情況下,消費者每隔30秒從nameserver獲取所有topic的最新隊列情況,這意味着某個broker如果宕機,客戶端最多要30秒才能感知。該時間由DefaultMQPushConsumer的pollNameServerInteval參數決定,可手動配置。
與broker關系
  •  連接
  • 單個消費者和該消費者關聯的所有broker保持長連接。
  • 心跳
  • 默認情況下,消費者每隔30秒向所有broker發送心跳,該時間由DefaultMQPushConsumer的heartbeatBrokerInterval參數決定,可手動配置。broker每隔10秒鍾(此時間無法更改),掃描所有還存活的連接,若某個連接2分鍾內(當前時間與最后更新時間差值超過2分鍾,此時間無法更改)沒有發送心跳數據,則關閉連接,並向該消費者分組的所有消費者發出通知,分組內消費者重新分配隊列繼續消費
  •  斷開
  • 時機:消費者掛掉;心跳超時導致broker主動關閉連接
  • 動作:一旦連接斷開,broker會立即感知到,並向該消費者分組的所有消費者發出通知,分組內消費者重新分配隊列繼續消費
負載均衡
集群消費模式下,一個消費者集群多台機器共同消費一個topic的多個隊列,一個隊列只會被一個消費者消費。如果某個消費者掛掉,分組內其它消費者會接替掛掉的消費者繼續消費。
消費機制
  •  本地隊列
  •     消費者不間斷的從broker拉取消息,消息拉取到本地隊列,然后本地消費線程消費本地消息隊列,只是一個異步過程,拉取線程不會等待本地消費線程,這種模式實時性非常高(本地消息隊列達到解耦的效果,響應時間減少)。對消費者對本地隊列有一個保護,因此本地消息隊列不能無限大,否則可能會占用大量內存,本地隊列大小由DefaultMQPushConsumer的pullThresholdForQueue屬性控制,默認1000,可手動設置。
  •  輪詢間隔
  •      消息拉取線程每隔多久拉取一次?間隔時間由DefaultMQPushConsumer的pullInterval屬性控制,默認為0,可手動設置。
  • 消息消費數量
  •      監聽器每次接受本地隊列的消息是多少條?這個參數由DefaultMQPushConsumer的consumeMessageBatchMaxSize屬性控制,默認為1,可手動設置。
默認的分配算法是AllocateMessageQueueAveragely
還有另外一種平均的算法是AllocateMessageQueueAveragelyByCircle,也是平均分攤每一條queue,只是以環狀輪流分queue的形式,如下圖:
消費進度存儲
     每隔一段時間將各個隊列的消費進度存儲到對應的broker上,該時間由DefaultMQPushConsumer的persistConsumerOffsetInterval屬性控制,默認為5秒,可手動設置。
 
如果一個topic在某broker上有3個隊列,一個消費者消費這3個隊列,那么該消費者和這個broker有幾個連接?
     一個連接,消費單位與隊列相關,消費連接只跟broker相關,事實上,消費者將所有隊列的消息拉取任務放到本地的隊列,挨個拉取,拉取完畢后,又將拉取任務放到隊尾,然后執行下一個拉取任務
不同消費者組中,對於同一條消息,可以重復消費,也就是說,A消息在group1中消費過后,到了group2中是一條新消息。
同一個消費者組中的消費者,可以訂閱不同的topic嗎?
不可以
這里其實是做了一個檢查,做這個檢查的默認前提是一個consumerGroup下面的訂閱消息是一樣的,就是每個consumer注冊的subscription應該是一樣的,如果不一樣就把之前注冊的刪除。
consumerTable中存放的消費者信息是按照消費組來的,那么一個組的消費信息如果不一樣,按照我們的例子中,則訂閱了  TOPICA的消費者心跳信息告訴  Broker:我們組訂閱的是  TOPICA!然后  Broker
就記錄下來了。過了一會訂閱了  TOPICB的消費者心跳信息高速  Broker:我們訂閱的是  TOPICB! 
這里就導致了訂閱消息相互覆蓋,那么拉取消息時,肯定有一個消費者沒法拉到消息,因為  Broker
上查詢不到訂閱信息。
 
RocketMQ了如何處理?UI轉圈,不給響應。
1. 對於大規模消息發送接收可以使用pull模式,手動處理消息拉取速度,消費的時候統計消費時間以供參考,保證單機上的消費速率。
2. 什么情況下產生消息堆積?
1.consumer故障(斷網,斷電等)導致,consumer重啟。
 
2.堆積的消息會過期嗎?不會過期,只有cimmitLog過期,如果堆積了幾百萬條,怎么處理?
即使加設備,上線了幾個新的consumer,短時間內也處理不了堆積的消息,超過一小時,怎么處理?

消息堆積如何處理

1、增加Consumer,增加MessageQueue,增加Consumer線程數
2、新建一個Topic,先消費將消息搬運到另外一個Topic,后用新Consumer消費處理

主要是在消費的時候,會影響新寫進來的消息,隊列:先進先出。
1.新上線一個group,將fromWhere定位到最后面,保證新來的消息可以正常處理,另外,讓老的group繼續消費堆積的消息,兩個並行執行。為了防止重復消費新的消息,記錄fromwhere,當offset到達fromwhere時候,停止消費。
2.准備一個臨時的topic,把原來topic中的消息挪到新的topic里,不做業務邏輯處理,只是挪過去, 新上線一個group,其中的consumer同時消費臨時topic中的數據。老的consumer繼續消費老的topic,因為此時老的topic中的消息是新進來的消息,剛生產的消息。只適合並發消費,臨時消費。
消息永遠在cimmitlog中,根據cimmitlog啥時候訪問到最后,超過48小時,才會進行刪除。
消息消費,是以group進行區分的,不是在message上打標記。維護了內存中的一個mapgroup,對應的offset定位消息消費到哪里,消息失敗的時候,先會進retry重試隊列,超過16次之后,才會進入到死信隊列,死信隊列也是根據group來的。
 

rocketmq怎么保證隊列完全順序消費?

RocketMQ可以嚴格的保證消息有序。但這個順序,不是全局順序,只是分區(queue)順序。要全局順序只能一個分區。

也就是說:同一個topic,在broker中默認會存在多個queue,因此,不論是producer還是customer,都不能保證生產或者消費的順序性,即消費端消費的時候,是會分配到多個queue的,多個queue是同時拉取提交消費。生產者同樣。

在同一條queue里面,RocketMQ的確是能保證FIFO的。那么要做到順序消息,必須把消息確保投遞到同一條queue。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM