Kafka設計的初衷是迅速處理短小的消息,一般10K大小的消息吞吐性能最好(可參見LinkedIn的kafka性能測試)。但有時候,我們需要處理更大的消息,比如XML文檔或JSON內容,一個消息差不多有10-100M,這種情況下,Kakfa應該如何處理?
針對這個問題,有以下幾個建議:
不過如果上述方法都不是你需要的,而你最終還是希望傳送大的消息,那么,則可以在kafka中設置下面一些參數:
broker 配置:
Consumer 配置:
針對這個問題,有以下幾個建議:
- 最好的方法是不直接傳送這些大的數據。如果有共享存儲,如NAS, HDFS, S3等,可以把這些大的文件存放到共享存儲,然后使用Kafka來傳送文件的位置信息。
- 第二個方法是,將大的消息數據切片或切塊,在生產端將數據切片為10K大小,使用分區主鍵確保一個大消息的所有部分會被發送到同一個kafka分區(這樣每一部分的拆分順序得以保留),如此以來,當消費端使用時會將這些部分重新還原為原始的消息。
- 第三,Kafka的生產端可以壓縮消息,如果原始消息是XML,當通過壓縮之后,消息可能會變得不那么大。在生產端的配置參數中使用compression.codec和commpressed.topics可以開啟壓縮功能,壓縮算法可以使用GZip或Snappy。
不過如果上述方法都不是你需要的,而你最終還是希望傳送大的消息,那么,則可以在kafka中設置下面一些參數:
broker 配置:
- message.max.bytes (默認:1000000) – broker能接收消息的最大字節數,這個值應該比消費端的fetch.message.max.bytes更小才對,否則broker就會因為消費端無法使用這個消息而掛起。
- log.segment.bytes (默認: 1GB) – kafka數據文件的大小,確保這個數值大於一個消息的長度。一般說來使用默認值即可(一般一個消息很難大於1G,因為這是一個消息系統,而不是文件系統)。
- replica.fetch.max.bytes (默認: 1MB) – broker可復制的消息的最大字節數。這個值應該比message.max.bytes大,否則broker會接收此消息,但無法將此消息復制出去,從而造成數據丟失。
Consumer 配置:
- fetch.message.max.bytes (默認 1MB) – 消費者能讀取的最大消息。這個值應該大於或等於message.max.bytes。
所以,如果你一定要選擇kafka來傳送大的消息,還有些事項需要考慮。要傳送大的消息,不是當出現問題之后再來考慮如何解決,而是在一開始設計的時候,就要考慮到大消息對集群和主題的影響。
- 性能: 根據前面提到的性能測試,kafka在消息為10K時吞吐量達到最大,更大的消息會降低吞吐量,在設計集群的容量時,尤其要考慮這點。
- 可用的內存和分區數:Brokers會為每個分區分配replica.fetch.max.bytes參數指定的內存空間,假設replica.fetch.max.bytes=1M,且有1000個分區,則需要差不多1G的內存,確保 分區數*最大的消息不會超過服務器的內存,否則會報OOM錯誤。同樣地,消費端的fetch.message.max.bytes指定了最大消息需要的內存空間,同樣,分區數*最大需要內存空間 不能超過服務器的內存。所以,如果你有大的消息要傳送,則在內存一定的情況下,只能使用較少的分區數或者使用更大內存的服務器。
- 垃圾回收:到現在為止,我在kafka的使用中還沒發現過此問題,但這應該是一個需要考慮的潛在問題。更大的消息會讓GC的時間更長(因為broker需要分配更大的塊),隨時關注GC的日志和服務器的日志信息。如果長時間的GC導致kafka丟失了zookeeper的會話,則需要配置zookeeper.session.timeout.ms參數為更大的超時時間。