Kafka在大型應用中的 20 項最佳實踐


原標題:Kafka如何做到1秒處理1500萬條消息?

Apache Kafka 是一款流行的分布式數據流平台,它已經廣泛地被諸如 New Relic(數據智能平台)、Uber、Square(移動支付公司)等大型公司用來構建可擴展的、高吞吐量的、且高可靠的實時數據流系統。

例如,在 New Relic 的生產環境中,Kafka 群集每秒能夠處理超過 1500 萬條消息,而且其數據聚合率接近 1Tbps。

可見,Kafka 大幅簡化了對於數據流的處理,因此它也獲得了眾多應用開發人員和數據管理專家的青睞。

然而,在大型系統中 Kafka 的應用會比較復雜。如果您的 Consumers 無法跟上數據流的話,各種消息往往在未被查看之前就已經消失掉了。

同時,它在自動化數據保留方面的限制,高流量的發布+訂閱(publish-subscribe,pub/sub)模式等,可能都會影響到您系統的性能。

可以毫不誇張地說,如果那些存放着數據流的系統無法按需擴容、或穩定性不可靠的話,估計您經常會寢食難安。

為了減少上述復雜性,我在此分享 New Relic 公司為 Kafka 集群在應對高吞吐量方面的 20 項最佳實踐。

我將從如下四個方面進行展開:

  • Partitions(分區)
  • Consumers(消費者)
  • Producers(生產者)
  • Brokers(代理)

快速了解 Kafka 的概念與架構

Kafka 是一種高效的分布式消息系統。在性能上,它具有內置的數據冗余度與彈性,也具有高吞吐能力和可擴展性。

在功能上,它支持自動化的數據保存限制,能夠以“流”的方式為應用提供數據轉換,以及按照“鍵-值(key-value)”的建模關系“壓縮”數據流。

要了解各種最佳實踐,您需要首先熟悉如下關鍵術語:

Message(消息)

Kafka 中的一條記錄或數據單位。每條消息都有一個鍵和對應的一個值,有時還會有可選的消息頭。

Producer(生產者)

Producer 將消息發布到 Kafka 的 topics 上。Producer 決定向 topic 分區的發布方式,如:輪詢的隨機方法、或基於消息鍵(key)的分區算法。

Broker(代理)

Kafka 以分布式系統或集群的方式運行。那么群集中的每個節點稱為一個 Broker。

Topic(主題)

Topic 是那些被發布的數據記錄或消息的一種類別。消費者通過訂閱Topic,來讀取寫給它們的數據。

Topic Partition(主題分區)

不同的 Topic 被分為不同的分區,而每一條消息都會被分配一個 Offset,通常每個分區都會被復制至少一到兩次。

每個分區都有一個 Leader 和存放在各個 Follower 上的一到多個副本(即:數據的副本),此法可防止某個 Broker 的失效。

群集中的所有 Broker 都可以作為 Leader 和 Follower,但是一個 Broker 最多只能有一個 Topic Partition 的副本。Leader 可被用來進行所有的讀寫操作。

Offset(偏移量)

單個分區中的每一條消息都被分配一個 Offset,它是一個單調遞增的整型數,可用來作為分區中消息的唯一標識符。

Consumer(消費者)

Consumer 通過訂閱 Topic partition,來讀取 Kafka 的各種 Topic 消息。然后,消費類應用處理會收到消息,以完成指定的工作。

Consumer group(消費組)

Consumer 可以按照 Consumer group 進行邏輯划分。Topic Partition 被均衡地分配給組中的所有 Consumers。

因此,在同一個 Consumer group 中,所有的 Consumer 都以負載均衡的方式運作。

換言之,同一組中的每一個 Consumer 都能看到每一條消息。如果某個 Consumer 處於“離線”狀態的話,那么該分區將會被分配給同組中的另一個 Consumer。這就是所謂的“再均衡(rebalance)”。

當然,如果組中的 Consumer 多於分區數,則某些 Consumer 將會處於閑置的狀態。

相反,如果組中的 Consumer 少於分區數,則某些 Consumer 會獲得來自一個以上分區的消息。

Lag(延遲)

當 Consumer 的速度跟不上消息的產生速度時,Consumer 就會因為無法從分區中讀取消息,而產生延遲。

延遲表示為分區頭后面的 Offset 數量。從延遲狀態(到“追趕上來”)恢復正常所需要的時間,取決於 Consumer 每秒能夠應對的消息速度。

其公式如下:time = messages / (consume rate per second - produce rate per second)

針對 Partitions 的最佳實踐

①了解分區的數據速率,以確保提供合適的數據保存空間

此處所謂“分區的數據速率”是指數據的生成速率。換言之,它是由“平均消息大小”乘以“每秒消息數”得出的數據速率決定了在給定時間內,所能保證的數據保存空間的大小(以字節為單位)。

如果您不知道數據速率的話,則無法正確地計算出滿足基於給定時間跨度的數據,所需要保存的空間大小。

同時,數據速率也能夠標識出單個 Consumer 在不產生延時的情況下,所需要支持的最低性能值。

②除非您有其他架構上的需要,否則在寫 Topic 時請使用隨機分區

在您進行大型操作時,各個分區在數據速率上的參差不齊是非常難以管理的。

其原因來自於如下三個方面:

  • 首先,“熱”(有較高吞吐量)分區上的 Consumer 勢必會比同組中的其他 Consumer 處理更多的消息,因此很可能會導致出現在處理上和網絡上的瓶頸。
  • 其次,那些為具有最高數據速率的分區,所配置的最大保留空間,會導致Topic 中其他分區的磁盤使用量也做相應地增長。
  • 第三,根據分區的 Leader 關系所實施的最佳均衡方案,比簡單地將 Leader 關系分散到所有 Broker 上,要更為復雜。在同一 Topic 中,“熱”分區會“承載”10 倍於其他分區的權重。

有關 Topic Partition 的使用,可以參閱《Kafka Topic Partition的各種有效策略》https://blog.newrelic.com/engineering/effective-strategies-kafka-topic-partitioning/。

針對 Consumers 的最佳實踐

③如果 Consumers 運行的是比 Kafka 0.10 還要舊的版本,那么請馬上升級

在 0.8.x 版中,Consumer 使用 Apache ZooKeeper 來協調 Consumer group,而許多已知的 Bug 會導致其長期處於再均衡狀態,或是直接導致再均衡算法的失敗(我們稱之為“再均衡風暴”)。

因此在再均衡期間,一個或多個分區會被分配給同一組中的每個 Consumer。

而在再均衡風暴中,分區的所有權會持續在各個 Consumers 之間流轉,這反而阻礙了任何一個 Consumer 去真正獲取分區的所有權。

④調優 Consumer 的套接字緩沖區(socket buffers),以應對數據的高速流入

在 Kafka 的 0.10.x 版本中,參數 receive.buffer.bytes 的默認值為 64KB。而在 Kafka 的 0.8.x 版本中,參數 socket.receive.buffer.bytes 的默認值為 100KB。

這兩個默認值對於高吞吐量的環境而言都太小了,特別是如果 Broker 和 Consumer 之間的網絡帶寬延遲積(bandwidth-delay product)大於局域網(local areanetwork,LAN)時。

對於延遲為 1 毫秒或更多的高帶寬的網絡(如 10Gbps 或更高),請考慮將套接字緩沖區設置為 8 或 16MB。

如果您的內存不足,也至少考慮設置為 1MB。當然,您也可以設置為 -1,它會讓底層操作系統根據網絡的實際情況,去調整緩沖區的大小。

但是,對於需要啟動“熱”分區的 Consumers 來說,自動調整可能不會那么快。

⑤設計具有高吞吐量的 Consumers,以便按需實施背壓(back-pressure)

通常,我們應該保證系統只去處理其能力范圍內的數據,而不要超負荷“消費”,進而導致進程中斷“掛起”,或出現 Consume group 的溢出。

如果是在 Java 虛擬機(JVM)中運行,Consumers 應當使用固定大小的緩沖區,而且最好是使用堆外內存(off-heap)。請參見 Disruptor 模式:http://lmax-exchange.github.io/disruptor/files/Disruptor-1.0.pdf

固定大小的緩沖區能夠阻止 Consumer 將過多的數據拉到堆棧上,以至於 JVM 花費掉其所有的時間去執行垃圾回收,進而無法履行其處理消息的本質工作。

⑥在 JVM 上運行各種 Consumers 時,請警惕垃圾回收對它們可能產生的影響

例如,長時間垃圾回收的停滯,可能導致 ZooKeeper 的會話被丟棄、或 Consumer group 處於再均衡狀態。

對於 Broker 來說也如此,如果垃圾回收停滯的時間太長,則會產生集群掉線的風險。

針對 Producers 的最佳實踐

⑦配置 Producer,以等待各種確認

籍此 Producer 能夠獲知消息是否真正被發送到了 Broker 的分區上。在 Kafka 的 0.10.x 版本上,其設置是 Acks;而在 0.8.x 版本上,則為 request.required.acks。

Kafka 通過復制,來提供容錯功能,因此單個節點的故障、或分區 Leader 關系的更改不會影響到系統的可用性。

如果您沒有用 Acks 來配置 Producer(或稱“fireand forget”)的話,則消息可能會悄然丟失。

⑧為各個 Producer 配置 Retries

其默認值為 3,當然是非常低的。不過,正確的設定值取決於您的應用程序,即:就那些對於數據丟失零容忍的應用而言,請考慮設置為 Integer.MAX_VALUE(有效且最大)。

這樣將能夠應對 Broker 的 Leader 分區出現無法立刻響應 Produce 請求的情況。

⑨為高吞吐量的 Producer,調優緩沖區的大小

特別是 buffer.memory 和 batch.size(以字節為單位)。由於 batch.size 是按照分區設定的,而 Producer 的性能和內存的使用量,都可以與 Topic 中的分區數量相關聯。

因此,此處的設定值將取決於如下幾個因素:

  • Producer 數據速率(消息的大小和數量)
  • 要生成的分區數
  • 可用的內存量

請記住,將緩沖區調大並不總是好事,如果 Producer 由於某種原因而失效了(例如,某個 Leader 的響應速度比確認還要慢),那么在堆內內存(on-heap)中的緩沖的數據量越多,其需要回收的垃圾也就越多。

⑩檢測應用程序,以跟蹤諸如生成的消息數、平均消息大小、以及已使用的消息數等指標

針對 Brokers 的最佳實踐

⑪在各個 Brokers 上,請壓縮 Topics 所需的內存和 CPU 資源。

日志壓縮(請參見https://kafka.apache.org/documentation/#compaction)需要各個 Broker 上的堆棧(內存)和 CPU 周期都能成功地配合實現而如果讓那些失敗的日志壓縮數據持續增長的話,則會給 Brokers 分區帶來風險。

您可以在 Broker 上調整 log.cleaner.dedupe.buffer.size 和 log.cleaner.threads 這兩個參數,但是請記住,這兩個值都會影響到各個 Brokers 上的堆棧使用。

如果某個 Broker 拋出 OutOfMemoryError 異常,那么它將會被關閉、並可能造成數據的丟失。

而緩沖區的大小和線程的計數,則取決於需要被清除的 Topic Partition 數量、以及這些分區中消息的數據速率與密鑰的大小。

對於 Kafka 的 0.10.2.1 版本而言,通過 ERROR 條目來監控日志清理程序的日志文件,是檢測其線程可能出現問題的最可靠方法。

⑫通過網絡吞吐量來監控 Brokers

請監控發向(transmit,TX)和收向(receive,RX)的流量,以及磁盤的 I/O、磁盤的空間、以及 CPU 的使用率,而且容量規划是維護群集整體性能的關鍵步驟。

⑬在群集的各個 Brokers 之間分配分區的 Leader 關系

Leader 通常會需要大量的網絡 I/O 資源。例如,當我們將復制因子(replication factor)配置為 3、並運行起來時。

Leader 必須首先獲取分區的數據,然后將兩套副本發送給另兩個 Followers,進而再傳輸到多個需要該數據的 Consumers 上。

因此在該例子中,單個 Leader 所使用的網絡 I/O,至少是 Follower 的四倍。而且,Leader 還可能需要對磁盤進行讀操作,而 Follower 只需進行寫操作。

⑭不要忽略監控 Brokers 的 in-sync replica(ISR)shrinks、under-replicatedpartitions 和 unpreferred leaders

這些都是集群中潛在問題的跡象。例如,單個分區頻繁出現 ISR 收縮,則暗示着該分區的數據速率超過了 Leader 的能力,已無法為 Consumer 和其他副本線程提供服務了。

⑮按需修改 Apache Log4j 的各種屬性

詳細內容可以參考:https://github.com/apache/kafka/blob/trunk/config/log4j.properties

Kafka 的 Broker 日志記錄會耗費大量的磁盤空間,但是我們卻不能完全關閉它。

因為有時在發生事故之后,需要重建事件序列,那么 Broker 日志就會是我們最好的、甚至是唯一的方法。

⑯禁用 Topic 的自動創建,或針對那些未被使用的 Topics 建立清除策略

例如,在設定的 x 天內,如果未出現新的消息,您應該考慮該 Topic 是否已經失效,並將其從群集中予以刪除。此舉可避免您花時間去管理群集中被額外創建的元數據。

⑰對於那些具有持續高吞吐量的 Brokers,請提供足夠的內存,以避免它們從磁盤子系統中進行讀操作

我們應盡可能地直接從操作系統的緩存中直接獲取分區的數據。然而,這就意味着您必須確保自己的 Consumers 能夠跟得上“節奏”,而對於那些延遲的 Consumer 就只能強制 Broker 從磁盤中讀取了。

⑱對於具有高吞吐量服務級別目標(service level objectives,SLOs)的大型群集,請考慮為 Brokers 的子集隔離出不同的 Topic

至於如何確定需要隔離的 Topics,則完全取決於您自己的業務需要。例如,您有一些使用相同群集的聯機事務處理(multipleonline transaction processing,OLTP)系統。

那么將每個系統的 Topics 隔離到不同 Brokers 子集中,則能夠有助於限制潛在事件的影響半徑。

⑲在舊的客戶端上使用新的 Topic 消息格式。應當代替客戶端,在各個 Brokers 上加載額外的格式轉換服務

當然,最好還是要盡量避免這種情況的發生。

⑳不要錯誤地認為在本地主機上測試好 Broker,就能代表生產環境中的真實性能了

要知道,如果使用復制因子為 1,並在環回接口上對分區所做的測試,是與大多數生產環境截然不同的。

在環回接口上網絡延遲幾乎可以被忽略的,而在不涉及到復制的情況下,接收 Leader 確認所需的時間則同樣會出現巨大的差異。

總結

希望上述各項建議能夠有助於您更有效地去使用 Kafka。如果您想提高自己在 Kafka 方面的專業知識,請進一步查閱 Kafka 配套文檔中的“操作”部分,其中包含了有關操作群集等實用信息。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM