高並發系統設計(十五):【消息隊列】如何降低消息隊列系統中消息的延遲?


什么是消息延遲?

消息隊列在消費過程中大量堆積就是消息延遲,也就是消費的頻率跟不上生產。比方說,生產者向隊列中一共生產了1000條消息,某一個消費者消費進度是900條,那么這個消費者的消費延遲就是100條消息。

如何監控消息延遲

  • 使用消息隊列提供的工具,通過監控消息的堆積來完成;
  • 通過生成監控消息的方式來監控消息的延遲情況。

kafka的消費延遲及監控

在Kafka0.9之前的版本中,消費進度是存儲在ZooKeeper中的,消費者在消費消息的時候,先要從ZooKeeper中獲取最新的消費進度,再從這個進度的基礎上消費后面的消息。
在Kafka0.9版本之后,消費進度被遷入到Kakfa的一個專門的topic叫“__consumer_offsets”里面。可以依據不同的版本,從不同的位置,獲取到這個消費進度的信息。

kafka的兩個消息監控工具

1、kafka-consumer-groups.sh”(它在Kafka安裝包的bin目錄下)

2、JMX

程序的消費延遲檢測

可以定義一種特殊的消息,然后啟動一個監控程序,將這個消息定時地循環寫入到消息隊列中,消息的內容可以是生成消息的時間戳,並且也會作為隊列的消費者消費數據。業務處理程序消費到這個消息時直接丟棄掉,而監控程序在消費到這個消息時,就可以和這個消息的生成時間做比較,如果時間差達到某一個閾值就可以向我們報警。

 

 

 

當然推薦用工具 和結合程序檢測同時使用

減少消息延遲的正確姿勢
想要減少消息的處理延遲,需要在消費端和消息隊列兩個層面來完成。在消費端,我們的目標是提升消費者的消息處理能力,你能做的是:

  • 優化消費代碼提升性能;
  • 增加消費者的數量(這個方式比較簡單)。

不過,第二種方式會受限於消息隊列的實現。比如說,如果消息隊列使用的是Kafka就無法通過增加消費者數量的方式,來提升消息處理能力。

因為在Kafka中,一個Topic(話題)可以配置多個Partition(分區),數據會被平均或者按照生產者指定的方式,寫入到多個分區中,那么在消費的時候,Kafka約定一個分區只能被一個消費者消費,為什么要這么設計呢?在我看來,如果有多個consumer(消費者)可以消費一個分區的數據,那么在操作這個消費進度的時候就需要加鎖,可能會對性能有一定的影響。
所以說,話題的分區數量決定了消費的並行度,增加多余的消費者也是沒有用處的,那么你可以通過增加分區來提高消費者的處理能力。

 

 

那么,如何在不增加分區的前提下提升消費能力呢?

既然不能增加consumer,那么可以在一個consumer中提升處理消息的並行度,所以可以考慮使用多線程的方式來增加處理能力:可以預先創建一個或者多個線程池,在接收到消息之后,把消息丟到線程池中來異步地處理,這樣,原本串行的消費消息的流程就變成了並行的消費,可以提高消息消費的吞吐量,在並行處理的前提下,就可以在一次和消息隊列的交互中多拉取幾條數據,然后分配給多個線程來處理。

 

另外,寫消費客戶端的時候要考慮這種場景,拉取不到消息可以等待一段時間再來拉取,等待的時間不宜過長,否則會增加消息的延遲。一般建議固定的10ms~100ms,也可以按照一定步長遞增,比如第一次拉取不到消息等待10ms,第二次20ms,最長可以到100ms,直到拉取到消息再回到10ms。

總結

如何提升消息隊列的性能來降低消息消費的延遲,重點是:

  • 可以使用消息隊列提供的工具,或者通過發送監控消息的方式,來監控消息的延遲情況;
  • 橫向擴展消費者是提升消費處理能力的重要方式;
  • 選擇高性能的數據存儲方式,可以提升消息的消費性能。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM