Java必知必會-RocketMQ高級應用


架構

核心模塊

  • rocketmq-broker:接受生產者發來的消息並存儲(通過調用rocketmq-store),消費者從這里取得消息
  • rocketmq-client:提供發送、接受消息的客戶端API。
  • rocketmq-namesrv:NameServer,類似於Zookeeper,這里保存着消息的TopicName,隊列等運行時的元信息。
  • rocketmq-common:通用的一些類,方法,數據結構等。
  • rocketmq-remoting:基於Netty4的client/server + fastjson序列化 + 自定義二進制協議。
  • rocketmq-store:消息、索引存儲等。
  • rocketmq-filtersrv:消息過濾器Server,需要注意的是,要實現這種過濾,需要上傳代碼到MQ!(一般而言,我們利用Tag足以滿足大部分的過濾需求,如果更靈活更復雜的過濾需求,可以考慮filtersrv組件)。
  • rocketmq-tools:命令行工具。

NameServer

NameServer壓力不會太大,平時主要開銷是在維持心跳和提供Topic-Broker的關系數據。每個NameServer節點互相之間是獨立的,沒有任何信息交互。

Broker向NameServer發心跳時, 會帶上當前自己所負責的所有Topic信息,如果Topic個數太多(萬級別),會導致一次心跳中,就Topic的數據就幾十M,網絡情況差的話, 網絡傳輸失敗,心跳失敗,導致NameServer誤認為Broker心跳失敗。

NameServer 被設計成幾乎無狀態的,可以橫向擴展,節點之間相互之間無通信,通過部署多台機器來標記自己是一個偽集群。

每個 Broker 在啟動的時候會到 NameServer 注冊,Producer 在發送消息前會根據 Topic 到 NameServer 獲取到 Broker 的路由信息,Consumer 也會定時獲取 Topic 的路由信息。

所以從功能上看NameServer應該是和 ZooKeeper 差不多

Producer

每個 Broker 在啟動的時候會到 NameServer 注冊,Producer 在發送消息前會根據 Topic 到 NameServer 獲取到 Broker 的路由信息,Consumer 也會定時獲取 Topic 的路由信息。

所以從功能上看NameServer應該是和 ZooKeeper 差不多,據說 RocketMQ 的早期版本確實是使用的 ZooKeeper ,后來改為了自己實現的 NameServer 。

  • 同步發送:同步發送指消息發送方發出數據后會在收到接收方發回響應之后才發下一個數據包。一般用於重要通知消息,例如重要通知郵件、營銷短信。
  • 異步發送:異步發送指發送方發出數據后,不等接收方發回響應,接着發送下個數據包,一般用於可能鏈路耗時較長而對響應時間敏感的業務場景,例如用戶視頻上傳后通知啟動轉碼服務。
  • 單向發送:單向發送是指只負責發送消息而不等待服務器回應且沒有回調函數觸發,適用於某些耗時非常短但對可靠性要求並不高的場景,例如日志收集。

Broker

  • Broker是具體提供業務的服務器,單個Broker節點與所有的NameServer節點保持長連接及心跳,並會定時將Topic信息注冊到NameServer,順帶一提底層的通信和連接都是基於Netty實現的。
  • Broker負責消息存儲,以Topic為緯度支持輕量級的隊列,單機可以支撐上萬隊列規模,支持消息推拉模型。
  • 官網上有數據顯示:具有上億級消息堆積能力,同時可嚴格保證消息的有序性

Consumer

  • Consumer也由用戶部署,支持PUSH和PULL兩種消費模式,支持集群消費廣播消息,提供實時的消息訂閱機制
  • Pull:拉取型消費者(Pull Consumer)主動從消息服務器拉取信息,只要批量拉取到消息,用戶應用就會啟動消費過程,所以 Pull 稱為主動消費型。
  • Push:推送型消費者(Push Consumer)封裝了消息的拉取、消費進度和其他的內部維護工作,將消息到達時執行的回調接口留給用戶應用程序來實現。所以 Push 稱為被動消費類型,但從實現上看還是從消息服務器中拉取消息,不同於 Pull 的是 Push 首先要注冊消費監聽器,當監聽器處觸發后才開始消費消息。

消息領域模型

Message

Message(消息)就是要傳輸的信息。

一條消息必須有一個主題(Topic),主題可以看做是你的信件要郵寄的地址。

一條消息也可以擁有一個可選的標簽(Tag)和額處的鍵值對,它們可以用於設置一個業務 Key 並在 Broker 上查找此消息以便在開發期間查找問題。

Topic

Topic(主題)可以看做消息的規類,它是消息的第一級類型。比如一個電商系統可以分為:交易消息、物流消息等,一條消息必須有一個 Topic 。

Topic 與生產者和消費者的關系非常松散,一個 Topic 可以有0個、1個、多個生產者向其發送消息,一個生產者也可以同時向不同的 Topic 發送消息。

一個 Topic 也可以被 0個、1個、多個消費者訂閱。

Tag

Tag(標簽)可以看作子主題,它是消息的第二級類型,用於為用戶提供額外的靈活性。使用標簽,同一業務模塊不同目的的消息就可以用相同 Topic 而不同的 Tag 來標識。比如交易消息又可以分為:交易創建消息、交易完成消息等,一條消息可以沒有 Tag

標簽有助於保持您的代碼干凈和連貫,並且還可以為 RocketMQ 提供的查詢系統提供幫助。

Group

分組,一個組可以訂閱多個Topic。

分為ProducerGroup,ConsumerGroup,代表某一類的生產者和消費者,一般來說同一個服務可以作為Group,同一個Group一般來說發送和消費的消息都是一樣的

Queue

Kafka中叫Partition,每個Queue內部是有序的,在RocketMQ中分為讀和寫兩種隊列,一般來說讀寫隊列數量一致,如果不一致就會出現很多問題。

Message Queue

Message Queue(消息隊列),主題被划分為一個或多個子主題,即消息隊列。

一個 Topic 下可以設置多個消息隊列,發送消息時執行該消息的 Topic ,RocketMQ 會輪詢該 Topic 下的所有隊列將消息發出去。

消息的物理管理單位。一個Topic下可以有多個Queue,Queue的引入使得消息的存儲可以分布式集群化,具有了水平擴展能力。

Offset

RocketMQ 中,所有消息隊列都是持久化,長度無限的數據結構,所謂長度無限是指隊列中的每個存儲單元都是定長,訪問其中的存儲單元使用Offset 來訪問,Offset 為 java long 類型,64 位,理論上在 100年內不會溢出,所以認為是長度無限。

也可以認為 Message Queue 是一個長度無限的數組,Offset 就是下標。

image-20201113144614621

消息流程

Producer 與 NameServer集群中的其中一個節點(隨機選擇)建立長連接,定期從 NameServer 獲取 Topic 路由信息,並向提供 Topic 服務的 Broker Master 建立長連接,且定時向 Broker 發送心跳。

Producer 只能將消息發送到 Broker master,但是 Consumer 則不一樣,它同時和提供 Topic 服務的 Master 和 Slave建立長連接,既可以從 Broker Master 訂閱消息,也可以從 Broker Slave 訂閱消息。

初始化啟動:NameService

NamesrvStartup基本類描述了rocket初始化流程:

  • 第一步是初始化配置
  • 創建NamesrvController實例,並開啟兩個定時任務:
  • 每隔10s掃描一次Broker,移除處於不激活的Broker
  • 每隔10s打印一次KV配置。

Broker

Broker在RocketMQ中是進行處理Producer發送消息請求,Consumer消費消息的請求,並且進行消息的持久化,以及HA策略和服務端過濾,就是集群中很重的工作都是交給了Broker進行處理。

Broker模塊是通過BrokerStartup進行啟動的,會實例化BrokerController,並且調用其初始化方法

初始化流程很冗長,會根據配置創建很多線程池主要用來發送消息拉取消息查詢消息客戶端管理消費者管理,也有很多定時任務,同時也注冊了很多請求處理器,用來發送拉取消息查詢消息的。

Consumer

消費端會通過RebalanceService線程,10秒鍾做一次基於Topic下的所有隊列負載。

重復消費

例如,積分系統處理失敗了,這個系統要求重新發送一次這個消息,積分的系統重新接收並且處理成功了,但是別人的活動,優惠券等等服務也監聽了這個消息,就可能出現活動系統給他加GMV加兩次,優惠券扣兩次這種情況,這就是重復消費。一般采用接口冪等的方案解決重復消費的問題

接口冪等

同樣的參數調用我這個接口,調用多少次結果都是一個,你加GMV同一個訂單號你加一次是多少錢,你加N次都還是多少錢。

接口冪等的保證可以使用強校驗和弱校驗,分場景考慮

強校驗

每次消息過來都要拿着訂單號+業務場景這樣的唯一標識(比是天貓雙十一活動)去流水表查,看看有沒有這條流水,有就直接return不要走下面的流程了,沒有就執行后面的邏輯。之所以用流水表,是因為涉及到金錢這樣的活動,有啥問題后面也可以去流水表對賬,還有就是幫助開發人員定位問題。

弱校驗

一些不重要的場景,比如給誰發短信啥的,我就把這個id+場景唯一標識作為Redis的key,放到緩存里面失效時間看你場景,一定時間內的這個消息就去Redis判斷。

順序消費

數據量大的時候數據同步壓力還是很大的,有時候數據量大的表需要同步幾個億的數據。(並不是主從同步,主從延遲大會有問題,可能是從數據庫或者主數據庫同步到備庫

這種情況我們都是懟到隊列里面去,然后慢慢消費的,那問題就來了,我們在數據庫同時對一個Id的數據進行了增、改、刪三個操作,但是你消息發過去消費的時候變成了改,刪、增,這樣數據就不對了。

解決

一個topic下有多個隊列,為了保證發送有序,RocketMQ提供了MessageQueueSelector隊列選擇機制,他有三種實現:

img

我們可使用Hash取模法,讓同一個訂單發送到同一個隊列中,再使用同步發送,只有同個訂單的創建消息發送成功,再發送支付消息。這樣,我們保證了發送有序。

RocketMQ的topic內的隊列機制,可以保證存儲滿足FIFO,剩下的只需要消費者順序消費即可。

RocketMQ僅保證順序發送,順序消費由消費者業務保證!!!

這里很好理解,一個訂單你發送的時候放到一個隊列里面去,同一個的訂單號Hash后還是一樣的結果,那肯定是一個消費者消費,順序就可以保證了

真正的順序消費不同的中間件都有自己的不同實現我這里就舉個例子,大家思路理解下。

消息去重

去重原則:使用業務端邏輯保持冪等性

冪等性:就是用戶對於同一操作發起的一次請求或者多次請求的結果是一致的,不會因為多次點擊而產生了副作用,數據庫的結果都是唯一的,不可變的。

只要保持冪等性,不管來多少條重復消息,最后處理的結果都一樣,需要業務端來實現。

去重策略:保證每條消息都有唯一編號(比如唯一流水號),且保證消息處理成功與去重表的日志同時出現。

建立一個消息表,拿到這個消息做數據庫的insert操作。給這個消息做一個唯一主鍵(primary key)或者唯一約束,那么就算出現重復消費的情況,就會導致主鍵沖突,那么就不再處理這條消息。

消息重復

比如:網絡原因閃斷,ACK返回失敗等等故障,確認信息沒有傳送到消息隊列,導致消息隊列不知道自己已經消費過該消息了,再次將該消息分發給其他的消費者。

不同的消息隊列發送的確認信息形式不同,例如RabbitMQ是發送一個ACK確認消息,RocketMQ是返回一個CONSUME_SUCCESS成功標志,Kafka實際上有個offset的概念。

消息可用性

當我們選擇好了集群模式之后,那么我們需要關心的就是怎么去存儲和復制這個數據,RocketMQ對消息的刷盤提供了同步和異步的策略來滿足我們的,當我們選擇同步刷盤之后,如果刷盤超時會給返回FLUSH_DISK_TIMEOUT,如果是異步刷盤不會返回刷盤相關信息,選擇同步刷盤可以盡最大程度滿足我們的消息不會丟失。

除了存儲有選擇之后,我們的主從同步提供了同步和異步兩種模式來進行復制,當然選擇同步可以提升可用性,但是消息的發送RT時間會下降10%左右。

RocketMQ采用的是混合型的存儲結構,即為Broker單個實例下所有的隊列共用一個日志數據文件(即為CommitLog)來存儲。

Kafka采用的是獨立型的存儲結構,每個隊列一個文件。

這里帥丙認為,RocketMQ采用混合型存儲結構的缺點在於,會存在較多的隨機讀操作,因此讀的效率偏低。同時消費消息需要依賴ConsumeQueue,構建該邏輯消費隊列需要一定開銷。

刷盤機制

Broker 在消息的存取時直接操作的是內存(內存映射文件),這可以提供系統的吞吐量,但是無法避免機器掉電時數據丟失,所以需要持久化到磁盤中。

刷盤的最終實現都是使用NIO中的 MappedByteBuffer.force() 將映射區的數據寫入到磁盤,如果是同步刷盤的話,在Broker把消息寫到CommitLog映射區后,就會等待寫入完成。

異步而言,只是喚醒對應的線程,不保證執行的時機,流程如圖所示。

分布式事務

Half Message(半消息)

是指暫不能被Consumer消費的消息。Producer 已經把消息成功發送到了 Broker 端,但此消息被標記為暫不能投遞狀態,處於該種狀態下的消息稱為半消息。需要 Producer對消息的二次確認(消息回查)后,Consumer才能去消費它。

消息回查

由於網絡閃段,生產者應用重啟等原因。導致 Producer 端一直沒有對 Half Message(半消息) 進行 二次確認。這是Brocker服務器會定時掃描長期處於半消息的消息,會主動詢問 Producer端 該消息的最終狀態(Commit或者Rollback),該消息即為 消息回查

消息過濾

  • Broker端消息過濾  
    Broker中,按照Consumer的要求做過濾,優點是減少了對於Consumer無用消息的網絡傳輸。缺點是增加了Broker的負擔,實現相對復雜。
  • Consumer端消息過濾
    這種過濾方式可由應用完全自定義實現,但是缺點是很多無用的消息要傳輸到Consumer端。

Broker的Buffer問題

Broker的Buffer通常指的是Broker中一個隊列的內存Buffer大小,這類Buffer通常大小有限。

另外,RocketMQ沒有內存Buffer概念,RocketMQ的隊列都是持久化磁盤,數據定期清除。

RocketMQ同其他MQ有非常顯著的區別,RocketMQ的內存Buffer抽象成一個無限長度的隊列,不管有多少數據進來都能裝得下,這個無限是有前提的,Broker會定期刪除過期的數據。

例如Broker只保存3天的消息,那么這個Buffer雖然長度無限,但是3天前的數據會被從隊尾刪除。

回溯消息

回溯消費是指Consumer已經消費成功的消息,由於業務上的需求需要重新消費,要支持此功能,Broker在向Consumer投遞成功消息后,消息仍然需要保留。並且重新消費一般是按照時間維度。

例如由於Consumer系統故障,恢復后需要重新消費1小時前的數據,那么Broker要提供一種機制,可以按照時間維度來回退消費進度。

RocketMQ支持按照時間回溯消費,時間維度精確到毫秒,可以向前回溯,也可以向后回溯。

消息堆積

消息中間件的主要功能是異步解耦,還有個重要功能是擋住前端的數據洪峰,保證后端系統的穩定性,這就要求消息中間件具有一定的消息堆積能力,消息堆積分以下兩種情況:

  • 消息堆積在內存Buffer,一旦超過內存Buffer,可以根據一定的丟棄策略來丟棄消息,如CORBA Notification規范中描述。適合能容忍丟棄消息的業務,這種情況消息的堆積能力主要在於內存Buffer大小,而且消息堆積后,性能下降不會太大,因為內存中數據多少對於對外提供的訪問能力影響有限。
  • 消息堆積到持久化存儲系統中,例如DB,KV存儲,文件記錄形式。當消息不能在內存Cache命中時,要不可避免的訪問磁盤,會產生大量讀IO,讀IO的吞吐量直接決定了消息堆積后的訪問能力。
  • 評估消息堆積能力主要有以下四點:
    • 消息能堆積多少條,多少字節?即消息的堆積容量。
    • 消息堆積后,發消息的吞吐量大小,是否會受堆積影響?
    • 消息堆積后,正常消費的Consumer是否會受影響?
    • 消息堆積后,訪問堆積在磁盤的消息時,吞吐量有多大?

定時消息

定時消息是指消息發到Broker后,不能立刻被Consumer消費,要到特定的時間點或者等待特定的時間后才能被消費。

如果要支持任意的時間精度,在Broker層面,必須要做消息排序,如果再涉及到持久化,那么消息排序要不可避免的產生巨大性能開銷。

RocketMQ支持定時消息,但是不支持任意時間精度,支持特定的level,例如定時5s,10s,1m等。

流量控制(削峰填谷)

網關的請求先放入消息隊列中,后端服務盡自己最大能力去消息隊列中消費請求。超時的請求可以直接返回錯誤。

還有一些服務特別是某些后台任務,不需要及時地響應,並且業務處理復雜且流程長,那么過來的請求先放入消息隊列中,后端服務按照自己的節奏處理

上面兩種情況分別對應着生產者生產過快和消費者消費過慢兩種情況,消息隊列都能在其中發揮很好的緩沖效果。

隊列模型

消息隊列有兩種模型:隊列模型發布/訂閱模型

生產者往某個隊列里面發送消息,一個隊列可以存儲多個生產者的消息,一個隊列也可以有多個消費者, 但是消費者之間是競爭關系,即每條消息只能被一個消費者消費。

當然隊列模型也可以通過消息全量存儲至多個隊列來解決一條消息被多個消費者消費問題,但是會有數據的冗余。

發布/訂閱模型

為了解決一條消息能被多個消費者消費的問題,發布/訂閱模型就來了。該模型是將消息發往一個Topic即主題中,所有訂閱了這個 Topic 的訂閱者都能消費這條消息。

其實可以這么理解,發布/訂閱模型等於我們都加入了一個群聊中,我發一條消息,加入了這個群聊的人都能收到這條消息。那么隊列模型就是一對一聊天,我發給你的消息,只能在你的聊天窗口彈出,是不可能彈出到別人的聊天窗口中的。

通過多隊列全量存儲相同的消息,即數據的冗余可以實現一條消息被多個消費者消費。RabbitMQ 就是采用隊列模型,通過 Exchange 模塊來將消息發送至多個隊列,解決一條消息需要被多個消費者消費問題。

這里還能看到假設群聊里除我之外只有一個人,那么此時的發布/訂閱模型和隊列模型其實就一樣了。

為了提高並發度,往往發布/訂閱模型還會引入隊列或者分區的概念。即消息是發往一個主題下的某個隊列或者某個分區中。RocketMQ中叫隊列,Kafka叫分區,本質一樣。

例如某個主題下有 5 個隊列,那么這個主題的並發度就提高為 5 ,同時可以有 5 個消費者並行消費該主題的消息。一般可以采用輪詢或者 key hash 取余等策略來將同一個主題的消息分配到不同的隊列中。

與之對應的消費者一般都有組的概念 Consumer Group, 即消費者都是屬於某個消費組的。一條消息會發往多個訂閱了這個主題的消費組。

假設現在有兩個消費組分別是Group 1Group 2,它們都訂閱了Topic-a。此時有一條消息發往Topic-a,那么這兩個消費組都能接收到這條消息。

然后這條消息實際是寫入Topic某個隊列中,消費組中的某個消費者對應消費一個隊列的消息。

在物理上除了副本拷貝之外,一條消息在Broker中只會有一份,每個消費組會有自己的offset即消費點位來標識消費到的位置。在消費點位之前的消息表明已經消費過了。當然這個offset是隊列級別的。每個消費組都會維護訂閱的Topic下的每個隊列的offset

image-20201113184655980

推/拉模式

推模式

一般而言我們在談論推拉模式的時候指的是 Comsumer 和 Broker 之間的交互

默認的認為 Producer 與 Broker 之間就是推的方式,即 Producer 將消息推送給 Broker,而不是 Broker 主動去拉取消息。

優點

消息實時性高, Broker 接受完消息之后可以立馬推送給 Consumer。

對於消費者使用來說更簡單,簡單啊就等着,反正有消息來了就會推過來。

缺點

推送速率難以適應消費速率,推模式的目標就是以最快的速度推送消息,當生產者往 Broker 發送消息的速率大於消費者消費消息的速率時,隨着時間的增長消費者那邊可能就“爆倉”了,因為根本消費不過來啊。當推送速率過快就像 DDos 攻擊一樣消費者就傻了。

所以說推模式難以根據消費者的狀態控制推送速率,適用於消息量不大、消費能力強要求實時性高的情況下。

拉模式

拉模式指的是 Consumer 主動從 Broker 請求拉取消息,即 Broker 被動的發送消息給 Consumer。

優點

拉模式主動權就在消費者身上了,消費者可以根據自身的情況來發起拉取消息的請求。假設當前消費者覺得自己消費不過來了,它可以根據一定的策略停止拉取,或者間隔拉取都行。

拉模式下 Broker 就相對輕松了,它只管存生產者發來的消息,至於消費的時候自然由消費者主動發起,來一個請求就給它消息唄,從哪開始拿消息,拿多少消費者都告訴它,它就是一個沒有感情的工具人,消費者要是沒來取也不關它的事。

拉模式可以更合適的進行消息的批量發送,基於推模式可以來一個消息就推送,也可以緩存一些消息之后再推送,但是推送的時候其實不知道消費者到底能不能一次性處理這么多消息。而拉模式就更加合理,它可以參考消費者請求的信息來決定緩存多少消息之后批量發送。

缺點

消息延遲,畢竟是消費者去拉取消息,但是消費者怎么知道消息到了呢?所以它只能不斷地拉取,但是又不能很頻繁地請求,太頻繁了就變成消費者在攻擊 Broker 了。因此需要降低請求的頻率,比如隔個 2 秒請求一次,你看着消息就很有可能延遲 2 秒了。

消息忙請求,忙請求就是比如消息隔了幾個小時才有,那么在幾個小時之內消費者的請求都是無效的,在做無用功。

選擇

RocketMQ 和 Kafka 都選擇了拉模式,當然業界也有基於推模式的消息隊列如 ActiveMQ。

因為現在的消息隊列都有持久化消息的需求,也就是說本身它就有個存儲功能,它的使命就是接受消息,保存好消息使得消費者可以消費消息即可。

而消費者各種各樣,身為 Broker 不應該有依賴於消費者的傾向,我已經為你保存好消息了,你要就來拿好了。

RocketMQ 和 Kafka 都是利用“長輪詢”來實現拉模式,減輕了拉模式的缺點

RocketMQ長輪詢

RocketMQ 中的 PushConsumer 其實是披着拉模式的方法,只是看起來像推模式而已

因為 RocketMQ 在被背后偷偷的幫我們去 Broker 請求數據了。

后台會有個 RebalanceService 線程,這個線程會根據 topic 的隊列數量和當前消費組的消費者個數做負載均衡,每個隊列產生的 pullRequest 放入阻塞隊列 pullRequestQueue 中。然后又有個 PullMessageService 線程不斷的從阻塞隊列 pullRequestQueue 中獲取 pullRequest,然后通過網絡請求 broker,這樣實現的准實時拉取消息。

然后 Broker 的 PullMessageProcessor 里面的 processRequest 方法是用來處理拉消息請求的,有消息就直接返回

而 PullRequestHoldService 這個線程會每 5 秒從 pullRequestTable 取PullRequest請求,然后看看待拉取消息請求的偏移量是否小於當前消費隊列最大偏移量,如果條件成立則說明有新消息了,則會調用 notifyMessageArriving ,最終調用 PullMessageProcessor 的 executeRequestWhenWakeup() 方法重新嘗試處理這個消息的請求,也就是再來一次,整個長輪詢的時間默認 30 秒。

ReputMessageService 線程用來不斷地從 commitLog 中解析數據並分發請求,構建出 ConsumeQueue 和 IndexFile 兩種類型的數據,並且也會有喚醒請求的操作,來彌補每 5s 一次這么慢的延遲

Kafka 中的長輪詢

像 Kafka 在拉請求中有參數,可以使得消費者請求在 “長輪詢” 中阻塞等待。

簡單的說就是消費者去 Broker 拉消息,定義了一個超時時間,也就是說消費者去請求消息,如果有的話馬上返回消息,如果沒有的話消費者等着直到超時,然后再次發起拉消息請求。

並且 Broker 也得配合,如果消費者請求過來,有消息肯定馬上返回,沒有消息那就建立一個延遲操作,等條件滿足了再返回。

消費者端調用的就是 Kafka 包裝過的 selector,而最終會調用 Java nio 的 select(timeout)

Broker 端處理所有請求的入口在 KafkaApis.scala 文件的 handle 方法下, 主要方法:handleFetchRequest

可以看到 RocketMQ 和 Kafka 都是采用“長輪詢”的機制,具體的做法都是通過消費者等待消息,當有消息的時候 Broker 會直接返回消息,如果沒有消息都會采取延遲處理的策略,並且為了保證消息的及時性,在對應隊列或者分區有新消息到來的時候都會提醒消息來了,及時返回消息。

一句話說就是消費者和 Broker 相互配合,拉取消息請求不滿足條件的時候 hold 住,避免了多次頻繁的拉取動作,當消息一到就提醒返回。

面試

1、如何保證消息不丟失

就我們市面上常見的消息隊列而言,只要配置得當,我們的消息就不會丟。

img

可以看到一共有三個階段,分別是生產消息、存儲消息和消費消息。我們從這三個階段分別入手來看看如何確保消息不會丟失。

生產消息

生產者發送消息至Broker,需要處理Broker的響應,不論是同步還是異步發送消息,同步和異步回調都需要做好try-catch,妥善的處理響應,如果Broker返回寫入失敗等錯誤消息,需要重試發送。當多次發送失敗需要作報警,日志記錄等。這樣就能保證在生產消息階段消息不會丟失。

存儲消息

存儲消息階段需要在消息刷盤之后再給生產者響應,假設消息寫入緩存中就返回響應,那么機器突然斷電這消息就沒了,而生產者以為已經發送成功了。

如果Broker是集群部署,有多副本機制,即消息不僅僅要寫入當前Broker,還需要寫入副本機中。那配置成至少寫入兩台機子后再給生產者響應。這樣基本上就能保證存儲的可靠了。

消費消息

當消費者拿到消息之后直接存入內存隊列中就直接返回給Broker消費成功,這是不對的。

需要考慮拿到消息放在內存之后消費者就宕機了怎么辦。所以我們應該在消費者真正執行完業務邏輯之后,再發送給Broker消費成功,這才是真正的消費了。

所以只要我們在消息業務邏輯處理完成之后再給Broker響應,那么消費階段消息就不會丟失。

小結

生產者需要處理好Broker的響應,出錯情況下利用重試、報警等手段。

Broker需要控制響應的時機,單機情況下是消息刷盤后返回響應,集群多副本情況下,即發送至兩個副本及以上的情況下再返回響應。

消費者需要在執行完真正的業務邏輯之后再返回響應給Broker

但是要注意消息可靠性增強了,性能就下降了,等待消息刷盤、多副本同步后返回都會影響性能。因此還是看業務,例如日志的傳輸可能丟那么一兩條關系不大,因此沒必要等消息刷盤再響應。

2、重復消息

假設我們發送消息,就管發,不管Broker的響應,那么我們發往Broker是不會重復的。

但是一般情況我們是不允許這樣的,這樣消息就完全不可靠了,我們的基本需求是消息至少得發到Broker上,那就得等Broker的響應,那么就可能存在Broker已經寫入了,當時響應由於網絡原因生產者沒有收到,然后生產者又重發了一次,此時消息就重復了。

再看消費者消費的時候,假設我們消費者拿到消息消費了,業務邏輯已經走完了,事務提交了,此時需要更新Consumer offset了,然后這個消費者掛了,另一個消費者頂上,此時Consumer offset還沒更新,於是又拿到剛才那條消息,業務又被執行了一遍。於是消息又重復了。

可以看到正常業務而言消息重復是不可避免的,因此我們只能從另一個角度來解決重復消息的問題。

關鍵點就是冪等。既然我們不能防止重復消息的產生,那么我們只能在業務上處理重復消息所帶來的影響。

冪等處理重復消息

例如這條 SQLupdate t1 set money = 150 where id = 1 and money = 100; 執行多少遍money都是150,這就叫冪等。

因此需要改造業務處理邏輯,使得在重復消息的情況下也不會影響最終的結果。

可以通過上面我那條 SQL 一樣,做了個前置條件判斷,即money = 100情況,並且直接修改,更通用的是做個version即版本號控制,對比消息中的版本號和數據庫中的版本號。

或者通過數據庫的約束例如唯一鍵,例如insert into update on duplicate key...

或者記錄關鍵的key,比如處理訂單這種,記錄訂單ID,假如有重復的消息過來,先判斷下這個ID是否已經被處理過了,如果沒處理再進行下一步。當然也可以用全局唯一ID等等。

消息有序性

全局有序

如果要保證消息的全局有序,首先只能由一個生產者往Topic發送消息,並且一個Topic內部只能有一個隊列(分區)。消費者也必須是單線程消費這個隊列。這樣的消息就是全局有序的!

不過一般情況下我們都不需要全局有序,即使是同步MySQL Binlog也只需要保證單表消息有序即可。

部分有序

因此絕大部分的有序需求是部分有序,部分有序我們就可以將Topic內部划分成我們需要的隊列數,把消息通過特定的策略發往固定的隊列中,然后每個隊列對應一個單線程處理的消費者。這樣即完成了部分有序的需求,又可以通過隊列數量的並發來提高消息處理效率。

image-20201113201630770

消息堆積處理

消息的堆積往往是因為生產者的生產速度與消費者的消費速度不匹配。有可能是因為消息消費失敗反復重試造成的,也有可能就是消費者消費能力弱,漸漸地消息就積壓了。

因此我們需要先定位消費慢的原因,如果是bug則處理 bug ,如果是因為本身消費能力較弱,我們可以優化下消費邏輯,比如之前是一條一條消息消費處理的,這次我們批量處理,比如數據庫的插入,一條一條插和批量插效率是不一樣的。

假如邏輯我們已經都優化了,但還是慢,那就得考慮水平擴容了,增加Topic的隊列數和消費者數量,注意隊列數一定要增加,不然新增加的消費者是沒東西消費的。一個Topic中,一個隊列只會分配給一個消費者

當然你消費者內部是單線程還是多線程消費那看具體場景。不過要注意上面提高的消息丟失的問題,如果你是將接受到的消息寫入內存隊列之后,然后就返回響應給Broker,然后多線程向內存隊列消費消息,假設此時消費者宕機了,內存隊列里面還未消費的消息也就丟了。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM