Kafka consumer處理大消息數據問題


案例分析

處理kafka consumer的程序的時候,發現如下錯誤:

ERROR [2017-01-12 07:16:02,466] com.flow.kafka.consumer.main.KafkaConsumer: Unexpected Error Occurred
! kafka.common.MessageSizeTooLargeException: Found a message larger than the maximum fetch size of this consumer on topic codeTopic partition 3 at fetch offset 94. Increase the fetch size, or decrease the maximum message size the broker will allow.
! at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:91) ~[pip-kafka-consumer.jar:na]
! at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33) ~[pip-kafka-consumer.jar:na]
! at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) ~[pip-kafka-consumer.jar:na]
! at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) ~[pip-kafka-consumer.jar:na]
! at com.flow.kafka.consumer.main.KafkaConsumer$KafkaRiverFetcher.run(KafkaConsumer.java:291) ~[original-pip-kafka-consumer.jar:na]
! at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_51]
! at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_51]
! at java.lang.Thread.run(Thread.java:744) [na:1.7.0_51]

如上log可以看出,問題就是有一個較大的消息數據在codeTopic的partition 3上,然后consumer未能消費,提示我可以減小broker允許進入的消息數據的大小,或者增大consumer程序消費數據的大小。

從log上來看一目了然,如果要解決當前問題的話,

  1. 減小broker消息體大小(設置message.max.bytes參數);
  2. 增大consumer獲取數據信息大小(設置fetch.message.max.bytes參數)。默認broker消息體大小為1000000字節即為1M大小。

消費者方面:fetch.message.max.bytes——>這將決定消費者可以獲取的數據大小。
broker方面:replica.fetch.max.bytes——>這將允許broker的副本發送消息在集群並確保消息被正確地復制。如果這是太小,則消息不會被復制,因此,消費者永遠不會看到的消息,因為消息永遠不會承諾(完全復制)。
broker方面:message.max.bytes——>可以接受數據生產者最大消息數據大小。

由我的場景來看較大的消息體已經進入到了kafka,我這里要解決這個問題,只需要增加consumer的fetch.message.max.bytes數值就好。我單獨把那條數據消費出來,寫到一個文件中發現那條消息大小為1.5M左右,為了避免再次發生這種問題我把consumer程序的fetch.message.max.bytes參數調節為了3072000即為3M,重啟consumer程序,查看log一切正常,解決這個消費錯誤到此結束,下面介紹一下kafka針對大數據處理的思考。

kafka的設計初衷

Kafka設計的初衷是迅速處理小量的消息,一般10K大小的消息吞吐性能最好(可參見LinkedIn的kafka性能測試)。但有時候,我們需要處理更大的消息,比如XML文檔或JSON內容,一個消息差不多有10-100M,這種情況下,Kakfa應該如何處理?

針對這個問題,可以參考如下建議:

  • 最好的方法是不直接傳送這些大的數據。如果有共享存儲,如NAS, HDFS, S3等,可以把這些大的文件存放到共享存儲,然后使用Kafka來傳送文件的位置信息。

  • 第二個方法是,將大的消息數據切片或切塊,在生產端將數據切片為10K大小,使用分區主鍵確保一個大消息的所有部分會被發送到同一個kafka分區(這樣每一部分的拆分順序得以保留),如此以來,當消費端使用時會將這些部分重新還原為原始的消息。

  • 第三,Kafka的生產端可以壓縮消息,如果原始消息是XML,當通過壓縮之后,消息可能會變得不那么大。在生產端的配置參數中使用compression.codeccommpressed.topics可以開啟壓縮功能,壓縮算法可以使用GZipSnappy

不過如果上述方法都不是你需要的,而你最終還是希望傳送大的消息,那么,則可以在kafka中設置下面一些參數:

broker 配置

message.max.bytes (默認:1000000) – broker能接收消息的最大字節數,這個值應該比消費端的fetch.message.max.bytes更小才對,否則broker就會因為消費端無法使用這個消息而掛起。
log.segment.bytes (默認: 1GB) – kafka數據文件的大小,確保這個數值大於一個消息的長度。一般說來使用默認值即可(一般一個消息很難大於1G,因為這是一個消息系統,而不是文件系統)。
replica.fetch.max.bytes (默認: 1MB) – broker可復制的消息的最大字節數。這個值應該比message.max.bytes大,否則broker會接收此消息,但無法將此消息復制出去,從而造成數據丟失。

Consumer 配置

fetch.message.max.bytes (默認 1MB) – 消費者能讀取的最大消息。這個值應該大於或等於message.max.bytes。所以,如果你一定要選擇kafka來傳送大的消息,還有些事項需要考慮。要傳送大的消息,不是當出現問題之后再來考慮如何解決,而是在一開始設計的時候,就要考慮到大消息對集群和主題的影響。

性能: 根據前面提到的性能測試,kafka在消息為10K時吞吐量達到最大,更大的消息會降低吞吐量,在設計集群的容量時,尤其要考慮這點。
可用的內存和分區數:Brokers會為每個分區分配replica.fetch.max.bytes參數指定的內存空間,假設replica.fetch.max.bytes=1M,且有1000個分區,則需要差不多1G的內存,確保 分區數最大的消息不會超過服務器的內存,否則會報OOM錯誤。同樣地,消費端的fetch.message.max.bytes指定了最大消息需要的內存空間,同樣,分區數最大需要內存空間 不能超過服務器的內存。所以,如果你有大的消息要傳送,則在內存一定的情況下,只能使用較少的分區數或者使用更大內存的服務器。
垃圾回收:到現在為止,我在kafka的使用中還沒發現過此問題,但這應該是一個需要考慮的潛在問題。更大的消息會讓GC的時間更長(因為broker需要分配更大的塊),隨時關注GC的日志和服務器的日志信息。如果長時間的GC導致kafka丟失了zookeeper的會話,則需要配置zookeeper.session.timeout.ms參數為更大的超時時間。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM