1. 丟消息
檢測消息丟失的方法
一般而言,一個新的系統剛剛上線,各方面都不太穩定,需要一個磨合期,這個時候,特別需要監控到你的系統中是否有消息丟失的情況。
如果是 IT 基礎設施比較完善的公司,一般都有分布式鏈路追蹤系統,使用類似的追蹤系統可以很方便地追蹤每一條消息。
可以利用消息隊列的有序性來驗證是否有消息丟失。原理非常簡單,在 Producer 端,我們給每個發出的消息附加一個連續遞增的序號,然后在 Consumer 端來檢查這個序號的連續性。
如果沒有消息丟失,Consumer 收到消息的序號必然是連續遞增的,或者說收到的消息,其中的序號必然是上一條消息的序號 +1。如果檢測到序號不連續,那就是丟消息了。還可以通過缺失的序號來確定丟失的是哪條消息,方便進一步排查原因。
大多數消息隊列的客戶端都支持攔截器機制,你可以利用這個攔截器機制,在 Producer 發送消息之前的攔截器中將序號注入到消息中,在 Consumer 收到消息的攔截器中檢測序號的連續性,這樣實現的好處是消息檢測的代碼不會侵入到你的業務代碼中,待你的系統穩定后,也方便將這部分檢測的邏輯關閉或者刪除。
如果是在一個分布式系統中實現這個檢測方法,有幾個問題需要你注意。
首先,像 Kafka 和 RocketMQ 這樣的消息隊列,它是不保證在 Topic 上的嚴格順序的,只能保證分區上的消息是有序的,所以我們在發消息的時候必須要指定分區,並且,在每個分區單獨檢測消息序號的連續性。
如果你的系統中 Producer 是多實例的,由於並不好協調多個 Producer 之間的發送順序,所以也需要每個 Producer 分別生成各自的消息序號,並且需要附加上 Producer 的標識,在 Consumer 端按照每個 Producer 分別來檢測序號的連續性。
Consumer 實例的數量最好和分區數量一致,做到 Consumer 和分區一一對應,這樣會比較方便地在 Consumer 內檢測消息序號的連續性。
確保消息可靠傳遞
整個消息從生產到消費的過程中,哪些地方可能會導致丟消息,以及應該如何避免消息丟失。一條消息從生產到消費完成這個過程,可以划分三個階段
- 生產階段: 在這個階段,從消息在 Producer 創建出來,經過網絡傳輸發送到 Broker 端。
- 存儲階段: 在這個階段,消息在 Broker 端存儲,如果是集群,消息會在這個階段被復制到其他的副本上。
- 消費階段: 在這個階段,Consumer 從 Broker 上拉取消息,經過網絡傳輸發送到 Consumer 上。
1. 生產階段
在生產階段,消息隊列通過最常用的請求確認機制,來保證消息的可靠傳遞:當你的代碼調用發消息方法時,消息隊列的客戶端會把消息發送到 Broker,Broker 收到消息后,會給客戶端返回一個確認響應,表明消息已經收到了。客戶端收到響應后,完成了一次正常消息的發送。
只要 Producer 收到了 Broker 的確認響應,就可以保證消息在生產階段不會丟失。有些消息隊列在長時間沒收到發送確認響應后,會自動重試,如果重試再失敗,就會以返回值或者異常的方式告知用戶。
你在編寫發送消息代碼時,需要注意,正確處理返回值或者捕獲異常,就可以保證這個階段的消息不會丟失。以 Kafka 為例,我們看一下如何可靠地發送消息:
同步發送時,只要注意捕獲異常即可。
try { RecordMetadata metadata = producer.send(record).get(); System.out.println(" 消息發送成功。"); } catch (Throwable e) { System.out.println(" 消息發送失敗!"); System.out.println(e); }
異步發送時,則需要在回調方法里進行檢查。這個地方是需要特別注意的,很多丟消息的原因就是,我們使用了異步發送,卻沒有在回調中檢查發送結果。
producer.send(record, (metadata, exception) -> { if (metadata != null) { System.out.println(" 消息發送成功。"); } else { System.out.println(" 消息發送失敗!"); System.out.println(exception); } });
2. 存儲階段
在存儲階段正常情況下,只要 Broker 在正常運行,就不會出現丟失消息的問題,但是如果 Broker 出現了故障,比如進程死掉了或者服務器宕機了,還是可能會丟失消息的。
如果對消息的可靠性要求非常高,可以通過配置 Broker 參數來避免因為宕機丟消息。
對於單個節點的 Broker,需要配置 Broker 參數,在收到消息后,將消息寫入磁盤后再給 Producer 返回確認響應,這樣即使發生宕機,由於消息已經被寫入磁盤,就不會丟失消息,恢復后還可以繼續消費。例如,在 RocketMQ 中,需要將刷盤方式 flushDiskType 配置為 SYNC_FLUSH 同步刷盤。
如果是 Broker 是由多個節點組成的集群,需要將 Broker 集群配置成:至少將消息發送到 2 個以上的節點,再給客戶端回復發送確認響應。這樣當某個 Broker 宕機時,其他的 Broker 可以替代宕機的 Broker,也不會發生消息丟失。消息隊列通過消息復制來確保消息的可靠性的。
3. 消費階段
消費階段采用和生產階段類似的確認機制來保證消息的可靠傳遞,客戶端從 Broker 拉取消息后,執行用戶的消費業務邏輯,成功后,才會給 Broker 發送消費確認響應。如果 Broker 沒有收到消費確認響應,下
次拉消息的時候還會返回同一條消息,確保消息不會在網絡傳輸過程中丟失,也不會因為客戶端在執行消費邏輯中出錯導致丟失。
你在編寫消費代碼時需要注意的是,不要在收到消息后就立即發送消費確認,而是應該在執行完所有消費業務邏輯之后,再發送消費確認。
同樣,我們以用 Python 語言消費 RabbitMQ 消息為例,來看一下如何實現一段可靠的消費代碼:
def callback(ch, method, properties, body): print(" [x] 收到消息 %r" % body) # 在這兒處理收到的消息 database.save(body) print(" [x] 消費完成 ") # 完成消費業務邏輯后發送消費確認響應 ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_consume(queue='hello', on_message_callback=callback)
在消費的回調方法 callback 中,正確的順序先是把消息保存到數據庫,然后再發送消費確認響應。這樣如果保存消息到數據庫失敗了,就不會執行消費確認的代碼,下次拉到的還是這條消息,直到消費成功。
兩個消費者先后去拉消息是否能拉到同一條消息?
首先,消息隊列一般都會有協調機制,不會讓這種情況出現,但是由於網絡不確定性,這種情況還是在極小概率下會出現的。
在同一個消費組內,A消費者拉走了index=10的這條消息,還沒返回確認,這時候這個分區的消費位置還是10,B消費者來拉消息,可能有2種情況:
- 1. 超時前,Broker認為這個分區還被A占用着,會拒絕B的請求。
- 2. 超時后,Broker認為A已經超時沒返回,這次消費失敗,當前消費位置還是10,B再來拉消息,會給它返回10這條消息。
- 在生產階段,你需要捕獲消息發送的錯誤,並重發消息。
- 在存儲階段,你可以通過配置刷盤和復制相關的參數,讓消息寫入到多個副本的磁盤上,來確保消息不會因為某個 Broker 宕機或者磁盤損壞而丟失。
- 在消費階段,你需要在處理完全部消費業務邏輯之后,再發送消費確認。
你在理解了這幾個階段的原理后,如果再出現丟消息的情況,應該可以通過在代碼中加一些日志的方式,很快定位到是哪個階段出了問題,然后再進一步深入分析,快速找到問題原因。
2. 重復消息
在消息傳遞過程中,如果出現傳遞失敗的情況,發送方會執行重試,重試的過程中就有可能會產生重復的消息。對使用消息隊列的業務系統來說,如果沒有對重復消息進行處理,就有可能會導致系統的數據出現錯誤。
消息重復的情況必然存在
在 MQTT 協議中,給出了三種傳遞消息時能夠提供的服務質量標准,這三種服務質量從低到高依次是:
-
At most once: 至多一次。消息在傳遞時,最多會被送達一次。換一個說法就是,沒什么消息可靠性保證,允許丟消息。一般都是一些對消息可靠性要求不太高的監控場景使用,比如每分鍾上報一次機房溫度數據,可以接受數據少量丟失。
-
At least once: 至少一次。消息在傳遞時,至少會被送達一次。也就是說,不允許丟消息,但是允許有少量重復消息出現。
-
Exactly once:恰好一次。消息在傳遞時,只會被送達一次,不允許丟失也不允許重復,這個是最高的等級。
這個服務質量標准不僅適用於 MQTT,對所有的消息隊列都是適用的。我們現在常用的絕大部分消息隊列提供的服務質量都是 At least once,包括 RocketMQ、RabbitMQ 和 Kafka 都是這樣。也就是說,消息
隊列很難保證消息不重復。
Kafka 支持的“Exactly once”和我們剛剛提到的消息傳遞的服務質量標准“Exactly once”是不一樣的,它是 Kafka 提供的另外一個特性,Kafka 中支持的事務也和我們通常意義理解的事務有一定的差異。在 Kafka
中,事務和 Excactly once 主要是為了配合流計算使用的特性。巧妙地用了兩個所有人都非常熟悉的概念“事務”和“Exactly once”來包裝它的新的特性,實際上它實現的這個事務和 Exactly once 並不是我們通常
理解的那兩個特性。
為什么大部分消息隊列都選擇只提供 At least once 的服務質量,而不是級別更高的 Exactly once?
解決一個問題,往往會引發別的問題。若消息隊列實現了exactly once,會引發的問題有:
①消費端在pull消息時,需要檢測此消息是否被消費,這個檢測機制無疑會拉低消息消費的速度。可以預想到,隨着消息的劇增,消費性能勢必會急劇下降,導致消息積壓;
②檢查機制還需要業務端去配合實現,若一條消息長時間未返回ack,消息隊列需要去回調看下消費結果(這個類似於事物消息的回查機制)。這樣就會增加業務端的壓力,與很多的未知因素。
所以,消息隊列不實現exactly once,而是at least once + 冪等性,這個冪等性讓給我們去處理。
最重要的原因是消息隊列即使做到了Exactly once級別,consumer也還是要做冪等。因為在consumer從消息隊列取消息這里,如果consumer消費成功,但是ack失敗,consumer還是會取到重復的消息,所以消
息隊列花大力氣做成Exactly once並不能解決業務側消息重復的問題。
1、At least once + 冪等消費 = Exactly once,所以對於消息隊列來講,要做到Exactly once,其實是需消費端的共同配合(冪等消費)才可完成,消息隊列基本只提供At least once的實現;
2、從給的幾種冪等消費的方案看,需要引入數據庫、條件更新、分布式事務或鎖等額外輔助,消息隊列如果需要保障Exactly once,會導致消費端代碼侵入,例如需要消費端增加消息隊列用來處理冪等的client
端,而消費端的形態可是太多了,兼容適配工作量巨大。故這個Exactly once留給用戶自己處理,並且具有選擇權,畢竟不是所有業務場景都需要Exactly once,例如機房溫度上報的案例。
如果隊列的實現是At least once,但是為了確保消息不丟失,Broker Service會進行一定的重試,但是不可能一直重試,如果一直重試失敗怎么處理了?
有的消息隊列會有一個特殊的隊列來保存這些總是消費失敗的“壞消息”,然后繼續消費之后的消息,避免壞消息卡死隊列。這種壞消息一般不會是因為網絡原因或者消費者死掉導致的,大多都是消息數據本身有
問題,消費者的業務邏輯處理不了導致的。
用冪等性解決重復消息問題
一般解決重復消息的辦法是,在消費端,讓我們消費消息的操作具備冪等性。
冪等(Idempotence)是一個數學上的概念,它是這樣定義的:
如果一個函數 f(x) 滿足:f(f(x)) = f(x),則函數 f(x) 滿足冪等性。
這個概念被拓展到計算機領域,被用來描述一個操作、方法或者服務。一個冪等操作的特點是,其任意多次執行所產生的影響均與一次執行的影響相同。
一個冪等的方法,使用同樣的參數,對它進行多次調用和一次調用,對系統產生的影響是一樣的。所以,對於冪等的方法,不用擔心重復執行會對系統造成任何改變。
比如在不考慮並發的情況下,“將賬戶 X 的余額設置為 100 元”,執行一次后對系統的影響是,賬戶 X 的余額變成了 100 元。只要提供的參數 100 元不變,那即使再執行多少次,賬戶 X 的余額始終都是 100 元,
不會變化,這個操作就是一個冪等的操作。
再比如“將賬戶 X 的余額加 100 元”,這個操作它就不是冪等的,每執行一次,賬戶余額就會增加 100 元,執行多次和執行一次對系統的影響(也就是賬戶的余額)是不一樣的。
如果我們系統消費消息的業務邏輯具備冪等性,那就不用擔心消息重復的問題了,因為同一條消息,消費一次和消費多次對系統的影響是完全一樣的。也就可以認為,消費多次等於消費一次。
從對系統的影響結果來說:At least once + 冪等消費 = Exactly once。
那么如何實現冪等操作呢?最好的方式就是,從業務邏輯設計上入手,將消費的業務邏輯設計成具備冪等性的操作。但是,不是所有的業務都能設計成天然冪等的,這里就需要一些方法和技巧來實現冪等。
下面我給你介紹幾種常用的設計冪等操作的方法:
1. 利用數據庫的唯一約束實現冪等
剛剛提到的那個不具備冪等特性轉賬的例子:將賬戶 X 的余額加 100 元。在這個例子中,我們可以通過改造業務邏輯,讓它具備冪等性。
首先,我們可以限定,對於每個轉賬單每個賬戶只可以執行一次變更操作,在分布式系統中,這個限制實現的方法非常多,最簡單的是我們在數據庫中建一張轉賬流水表,這個表有三個字段:轉賬單 ID、賬戶
ID 和變更金額,然后給轉賬單 ID 和賬戶 ID 這兩個字段聯合起來創建一個唯一約束,這樣對於相同的轉賬單 ID 和賬戶 ID,表里至多只能存在一條記錄。
這樣,我們消費消息的邏輯可以變為:“在轉賬流水表中增加一條轉賬記錄,然后再根據轉賬記錄,異步操作更新用戶余額即可。”在轉賬流水表增加一條轉賬記錄這個操作中,由於我們在這個表中預先定義了“賬
戶 ID 轉賬單 ID”的唯一約束,對於同一個轉賬單同一個賬戶只能插入一條記錄,后續重復的插入操作都會失敗,這樣就實現了一個冪等的操作。我們只要寫一個 SQL,正確地實現它就可以了。
基於這個思路,不光是可以使用關系型數據庫,只要是支持類似“INSERT IF NOT EXIST”語義的存儲類系統都可以用於實現冪等,
比如, Redis 的 SETNX 命令來替代數據庫中的唯一約束,來實現冪等消費。 ( redis中的hash:hsetnx <key> <field> <value>; )
比如,Elasticsearch中的冪等操作: PUT /movie_index/movie/3,加上文檔 ID
2. 為更新的數據設置前置條件
另外一種實現冪等的思路是,給數據變更設置一個前置條件,如果滿足條件就更新數據,否則拒絕更新數據,在更新數據的時候,同時變更前置條件中需要判斷的數據。這樣,重復執行這個操作時,由於第一次
更新數據的時候已經變更了前置條件中需要判斷的數據,不滿足前置條件,則不會重復執行更新數據操作。
比如,剛剛我們說過,“將賬戶 X 的余額增加 100 元”這個操作並不滿足冪等性,我們可以把這個操作加上一個前置條件,變為:“如果賬戶 X 當前的余額為 500 元,將余額加 100 元”,這個操作就具備了冪等
性。對應到消息隊列中的使用時,可以在發消息時在消息體中帶上當前的余額,在消費的時候進行判斷數據庫中,當前余額是否與消息中的余額相等,只有相等才執行變更操作。
但是,如果我們要更新的數據不是數值,或者我們要做一個比較復雜的更新操作怎么辦?用什么作為前置判斷條件呢?更加通用的方法是,給你的數據增加一個版本號屬性,每次更數據前,比較當前數據的版本
號是否和消息中的版本號一致,如果不一致就拒絕更新數據,更新數據的同時將版本號 +1,一樣可以實現冪等更新。
3. 記錄並檢查操作
如果上面提到的兩種實現冪等方法都不能適用於你的場景,我們還有一種通用性最強,適用范圍最廣的實現冪等性方法:記錄並檢查操作,也稱為“Token 機制或者 GUID(全局唯一 ID)機制”,實現的思路特別
簡單:在執行數據更新操作之前,先檢查一下是否執行過這個更新操作。
具體的實現方法是,在發送消息時,給每條消息指定一個全局唯一的 ID,消費時,先根據這個 ID 檢查這條消息是否有被消費過,如果沒有消費過,才更新數據,然后將消費狀態置為已消費。
原理和實現是不是很簡單?其實一點兒都不簡單,在分布式系統中,這個方法其實是非常難實現的。首先,給每個消息指定一個全局唯一的 ID 就是一件不那么簡單的事兒,方法有很多,但都不太好同時滿足簡
單、高可用和高性能,或多或少都要有些犧牲。更加麻煩的是,在“檢查消費狀態,然后更新數據並且設置消費狀態”中,三個操作必須作為一組操作保證原子性,才能真正實現冪等,否則就會出現 Bug。
比如說,對於同一條消息:“全局 ID 為 8,操作為:給 ID 為 666 賬戶增加 100 元”,有可能出現這樣的情況:
-
t0 時刻:Consumer A 收到條消息,檢查消息執行狀態,發現消息未處理過,開始執行“賬戶增加 100 元”;
-
t1 時刻:Consumer B 收到條消息,檢查消息執行狀態,發現消息未處理過,因為這個時刻,Consumer A 還未來得及更新消息執行狀態。
這樣就會導致賬戶被錯誤地增加了兩次 100 元,這是一個在分布式系統中非常容易犯的錯誤,一定要引以為戒。
對於這個問題,當然我們可以用事務來實現,也可以用鎖來實現,但是在分布式系統中,無論是分布式事務還是分布式鎖都是比較難解決問題。
幾種實現冪等操作的方法
- 可以利用數據庫的約束來防止重復更新數據,
- 可以為數據更新設置一次性的前置條件,來防止重復消息,如果這兩種方法都不適用於你的場景,
- 還可以用“記錄並檢查操作”的方式來保證冪等,這種方法適用范圍最廣,但是實現難度和復雜度也比較高,一般不推薦使用。
這些實現冪等的方法,不僅可以用於解決重復消息的問題,也同樣適用於,在其他場景中來解決重復請求或者重復調用的問題。比如,我們可以將 HTTP 服務設計成冪等的,解決前端或者 APP 重復提交表單數
據的問題;也可以將一個微服務設計成冪等的,解決 RPC 框架自動重試導致的重復調用問題。這些方法都是通用的。
3. 消息積壓問題
在使用消息隊列遇到的問題中,消息積壓這個問題,應該是最常遇到的問題。
消息積壓的直接原因,一定是系統中的某個部分出現了性能問題,來不及處理上游發送的消息,才會導致消息積壓。 所以在使用消息隊列時,如何來優化代碼的性能,避免出現消息積壓。
優化性能來避免消息積壓
在使用消息隊列的系統中,對於性能的優化,主要體現在生產者和消費者這一收一發兩部分的業務邏輯中。對於消息隊列本身的性能,不需要太關注。
主要原因是,對於絕大多數使用消息隊列的業務來說,消息隊列本身的處理能力要遠大於業務系統的處理能力。主流消息隊列的單個節點,消息收發的性能可以達到每秒鍾處理幾萬至幾十萬條消息的水平,還可
以通過水平擴展 Broker 的實例數成倍地提升處理能力。而一般的業務系統需要處理的業務邏輯遠比消息隊列要復雜,單個節點每秒鍾可以處理幾百到幾千次請求,已經可以算是性能非常好的了。所以,對於消
息隊列的性能優化,我們更關注的是,在消息的收發兩端,我們的業務代碼怎么和消息隊列配合,達到一個最佳的性能。
1. 發送端性能優化
如果說,代碼發送消息的性能上不去,需要優先檢查一下,是不是發消息之前的業務邏輯耗時太多導致的。
對於發送消息的業務邏輯,只需要注意設置合適的並發和批量大小,就可以達到很好的發送性能。
Producer 發送消息的過程,Producer 發消息給 Broker,Broker 收到消息后返回確認響應,這是一次完整的交互。假設這一次交互的平均時延是 1ms,把這 1ms 的時間分解開,它包括了下面這些步驟的耗時:
- 發送端准備數據、序列化消息、構造請求等邏輯的時間,也就是發送端在發送網絡請求之前的耗時;
- 發送消息和返回響應在網絡傳輸中的耗時;
- Broker 處理消息的時延。
如果是單線程發送,每次只發送 1 條消息,那么每秒只能發送 1000ms / 1ms * 1 條 /ms = 1000 條 消息,這種情況下並不能發揮出消息隊列的全部實力。
無論是增加每次發送消息的批量大小,還是增加並發,都能成倍地提升發送性能。至於到底是選擇批量發送還是增加並發,主要取決於發送端程序的業務性質。
比如說,你的消息發送端是一個微服務,主要接受 RPC 請求處理在線業務。很自然的,微服務在處理每次請求的時候,就在當前線程直接發送消息就可以了,因為所有 RPC 框架都是多線程支持多並發的,自
然也就實現了並行發送消息。並且在線業務比較在意的是請求響應時延,選擇批量發送必然會影響 RPC 服務的時延。這種情況,比較明智的方式就是通過並發來提升發送性能。
如果你的系統是一個離線分析系統,離線系統在性能上的需求是什么呢?它不關心時延,更注重整個系統的吞吐量。發送端的數據都是來自於數據庫,這種情況就更適合批量發送,你可以批量從數據庫讀取數
據,然后批量來發送消息,同樣用少量的並發就可以獲得非常高的吞吐量。
2. 消費端性能優化
使用消息隊列的時候,大部分的性能問題都出現在消費端,如果消費的速度跟不上發送端生產消息的速度,就會造成消息積壓。如果這種性能倒掛的問題只是暫時的,那問題不大,只要消費端的性能恢復之后,超過發送端的性能,那積壓的消息是可以逐漸被消化掉的。
要是消費速度一直比生產速度慢,時間長了,整個系統就會出現問題,要么,消息隊列的存儲被填滿無法提供服務,要么消息丟失,這對於整個系統來說都是嚴重故障。
所以,在設計系統的時候,一定要保證消費端的消費性能要高於生產端的發送性能,這樣的系統才能健康的持續運行。
消費端的性能優化除了優化消費業務邏輯以外,也可以通過水平擴容,增加消費端的並發數來提升總體的消費性能。特別需要注意的一點是,在擴容 Consumer 的實例數量的同時,必須同步擴容主題中的分區
(也叫隊列)數量,確保 Consumer 的實例數和分區數量是相等的。如果 Consumer 的實例數量超過分區數量,這樣的擴容實際上是沒有效果的。因為對於消費者,在每個分區上實際只能支持單線程消費。
一個解決消費慢的問題常見的錯誤:
它收消息處理的業務邏輯可能比較慢,也很難再優化了,為了避免消息積壓,在收到消息的 OnMessage 方法中,不處理任何業務邏輯,把這個消息放到一個內存隊列里面就返回了。然后它可以啟動很多的業務
線程,這些業務線程里面是真正處理消息的業務邏輯,這些線程從內存隊列里取消息處理,這樣它就解決了單個 Consumer 不能並行消費的問題。
這個方法是不是很完美地實現了並發消費?錯誤! 因為會丟消息。如果收消息的節點發生宕機,在內存隊列中還沒來及處理的這些消息就會丟失。
在onMessage方法結束后,如果沒有拋異常,就自動ACK了。而這個時候,消息只是在內存隊列中,並沒有被真正處理完。
理論上是可以的,但要注意,像RocketMQ,采用默認配置的時候,onMessage方法結束后,如果沒拋異常,默認就會自動確認了。
在消費端是否可以通過批量消費的方式來提升消費性能?在什么樣場景下,適合使用這種方法?或者說,這種方法有什么局限性?
批量消費有意義的場景要求:
- 1.要么消費端對消息的處理支持批量處理,比如批量入庫
- 2. 要么消費端支持多線程/協程並發處理,業務上也允許消息無序。
- 3. 或者網絡帶寬在考慮因素內,需要減少消息的overhead。
批量消費的局限性:
- 1. 需要一個整體ack的機制,一旦一條靠前的消息消費失敗,可能會引起很多消息重試。
- 2. 多線程下批量消費速度受限於最慢的那個線程。
但其實以上局限並沒有影響主流MQ的實現了批量功能。
1、要求消費端能夠批量處理或者開啟多線程進行單條處理
2、批量消費一旦某一條數據消費失敗會導致整批數據重復消費
3、對實時性要求不能太高,批量消費需要Broker積累到一定消費數據才會發送到Consumer
消費端進行批量操作,感覺和上面的先將消息放在內存隊列中,然后在並發消費消息,如果機器宕機,這些批量消息都會丟失,如果在數據庫層面,批量操作在大事務,會導致鎖的競爭,並且也會導致主備的不
一致。如果是一些不重要的消息如對日志進行備份,就可以使用批量操作之類的提高消費性能,因為一些日志消息丟失也是可以接受的。
如果使用了批量消費的方式,那么就需要批量確認,如果一次消費十條消息,除了第七條消費失敗了,其他的都處理成功了,但是這中情況下broker只能將消費的游標修改成消息7,而之后的消息雖然處理成功
了,但是也只能使用類似於拉回重傳的方式再次消費,浪費性能,而且這種批量消費對於消費者的並發我覺得不是很友好,可能消費者1來了取走了十條消息在處理,這時候消費者2過來了也想取十條消息,但是
他需要等待消費者1進行ack才可以取走消息。
可以簡單計算一下,消費並行度:單實例平均消費tps * 消費並行度 > 生產消息的總tps
消費並行度 = min(consumer實例數,分區數量)
消息積壓的緊急處理
還有一種消息積壓的情況是,日常系統正常運轉的時候,沒有積壓或者只有少量積壓很快就消費掉了,但是某一個時刻,突然就開始積壓消息並且積壓持續上漲。這種情況下需要你在短時間內找到消息積壓的原
因,迅速解決問題才不至於影響業務。
導致突然積壓的原因肯定是多種多樣的,不同的系統、不同的情況有不同的原因,不能一概而論。但是,排查消息積壓原因,是有一些相對固定而且比較有效的方法的。
能導致積壓突然增加,最粗粒度的原因,只有兩種:要么是發送變快了,要么是消費變慢了。
大部分消息隊列都內置了監控的功能,只要通過監控數據,很容易確定是哪種原因。如果是單位時間發送的消息增多,比如說是趕上大促或者搶購,短時間內不太可能優化消費端的代碼來提升消費性能,唯一的
方法是通過擴容消費端的實例數來提升總體的消費能力。
如果短時間內沒有足夠的服務器資源進行擴容,沒辦法的辦法是,將系統降級,通過關閉一些不重要的業務,減少發送方發送的數據量,最低限度讓系統還能正常運轉,服務一些重要業務。
還有一種不太常見的情況,你通過監控發現,無論是發送消息的速度還是消費消息的速度和原來都沒什么變化,這時候你需要檢查一下你的消費端,是不是消費失敗導致的一條消息反復消費這種情況比較多,這
種情況也會拖慢整個系統的消費速度。
如果監控到消費變慢了,你需要檢查你的消費實例,分析一下是什么原因導致消費變慢。優先檢查一下日志是否有大量的消費錯誤,如果沒有錯誤的話,可以通過打印堆棧信息,看一下你的消費線程是不是卡在
什么地方不動了,比如觸發了死鎖或者卡在等待某些資源上了。
優化消息收發性能,預防消息積壓的方法有兩種,增加批量或者是增加並發,在發送端這兩種方法都可以使用,在消費端需要注意的是,增加並發需要同步擴容分區數量,否則是起不到效果的。
對於系統發生消息積壓的情況,需要先解決積壓,再分析原因,畢竟保證系統的可用性是首先要解決的問題。快速解決積壓的方法就是通過水平擴容增加 Consumer 的實例數量。
消息積壓處理:
1、發送端優化,增加批量和線程並發兩種方式處理
2、消費端優化,優化業務邏輯代碼、水平擴容增加並發並同步擴容分區數量
查看消息積壓的方法:
1、消息隊列內置監控,查看發送端發送消息與消費端消費消息的速度變化
2、查看日志是否有大量的消費錯誤
3、打印堆棧信息,查看消費線程卡點信息
面試解決消息積壓的方法:
(1)臨時擴容,增加消費端,用硬件提升消費速度。
(2)服務降級,關閉一些非核心業務,減少消息生產。
(3)通過日志分析,監控等找到擠壓原因,消息隊列三部分,上游生產者是否異常生產大量數據,中游消息隊列存儲層是否出現問題,下游消費速度是否變慢,就能確定哪個環節出了問題
(4)根據排查解決異常部分。
(5)等待積壓的消息被消費,恢復到正常狀態,撤掉擴容服務器。
如何保證消息的嚴格順序?
怎么來保證消息的嚴格順序?主題層面是無法保證嚴格順序的,只有在隊列上才能保證消息的嚴格順序。
如果說,你的業務必須要求全局嚴格順序,就只能把消息隊列數配置成 1,生產者和消費者也只能是一個實例,這樣才能保證全局嚴格順序。
大部分情況下,並不需要全局嚴格順序,只要保證局部有序就可以滿足要求了。比如,在傳遞賬戶流水記錄的時候,只要保證每個賬戶的流水有序就可以了,不同賬戶之間的流水記錄是不需要保證順序的。
如果需要保證局部嚴格順序,可以這樣來實現。在發送端,我們使用賬戶 ID 作為 Key,采用一致性哈希算法計算出隊列編號,指定隊列來發送消息。一致性哈希算法可以保證,相同 Key 的消息總是發送到同一
個隊列上,這樣可以保證相同 Key 的消息是嚴格有序的。如果不考慮隊列擴容,也可以用隊列數量取模的簡單方法來計算隊列編號。