kafka發送超大消息設置


  最近開發一cdc框架,為了測試極端情況,需要kafka傳遞100萬條數據過去,1個G左右,由於其他環節限制,不便進行拆包(注:測下來,大包走kafka不一定性能更好,甚至可能更低)。

  測試百萬以上的變更數據時,報消息超過kafka broker允許的最大值,因此需要修改如下參數,保證包能夠正常發送:

  • socket.request.max.bytes=2147483647    # 設置了socket server接收的最大請求大小
  • log.segment.bytes=2147483647              # kafka數據文件的大小,確保這個數值大於一個消息的長度。一般說來使用默認值即可(一般一個消息很難大於1G,因為這是一個消息系統,而不是文件系統)。
  • message.max.bytes=2147483647             # 設置了kafka server接收的最大消息大小,應小於等於socket.request.max.bytes
  • replica.fetch.max.bytes=2147483647         #每個分區試圖獲取的消息字節數。要大於等於message.max.bytes,否則broker會接收此消息,但無法將此消息復制出去,從而造成數據丟失。
  • fetch.message.max.bytes=2147483647      #每個提取請求中為每個主題分區提取的消息字節數。要大於等於message.max.bytes,否則broker就會因為消費端無法使用這個消息而掛起。

生產者可以如下設定:

kafkaProps.put("max.request.size", 2147483647);    # 要小於 message.max.bytes,也可以設置在producer.properties配置文件中
kafkaProps.put("buffer.memory", 2147483647);
// kafkaProps.put("timeout.ms", 3000000); # 該選項在最新版本中已經不再起作用
kafkaProps.put("request.timeout.ms", 30000000);

消費者設定如下:

props.put("request.timeout.ms", 30000000);
props.put("session.timeout.ms", "3000000");
props.put("fetch.max.wait.ms", "3000000");

  各參數的含義可以參考kafka官方文檔https://kafka.apache.org/documentation/#configuration。

  kafka基礎知識體系,請參考LZ學習筆記kafka學習指南(總結版)

  注,各參數對內存的影響如下:Brokers會為每個分區分配replica.fetch.max.bytes參數指定的內存空間,假設replica.fetch.max.bytes=1M,且有1000個分區,則需要差不多1G的內存,確保 分區數*最大的消息不會超過服務器的內存,否則會報OOM錯誤。同樣地,消費端的fetch.message.max.bytes指定了最大消息需要的內存空間,同樣,分區數*最大需要內存空間 不能超過服務器的內存。所以,如果你有大的消息要傳送,則在內存一定的情況下,只能使用較少的分區數或者使用更大內存的服務器。

  雖然上面的方法可以奏效,但是並不推薦。Kafka設計的初衷是迅速處理短小的消息,一般10K大小的消息吞吐性能最好(可參見LinkedIn的kafka性能測試)。但有時候,我們需要處理更大的消息,比如XML文檔或JSON內容,一個消息差不多有10-100M,這種情況下,Kakfa應該如何處理?

針對這個問題,有以下幾個建議:

  1.   最好的方法是不直接傳送這些大的數據。如果有共享存儲,如NAS, HDFS, S3等,可以把這些大的文件存放到共享存儲,然后使用Kafka來傳送文件的位置信息。
  2.   第二個方法是,將大的消息數據切片或切塊,在生產端將數據切片為10K大小,使用分區主鍵確保一個大消息的所有部分會被發送到同一個kafka分區(這樣每一部分的拆分順序得以保留),如此以來,當消費端使用時會將這些部分重新還原為原始的消息。
  3.   第三,Kafka的生產端可以壓縮消息,如果原始消息是XML,當通過壓縮之后,消息可能會變得不那么大。在生產端的配置參數中使用compression.codec和commpressed.topics可以開啟壓縮功能,壓縮算法可以使用GZip或Snappy。

  上面這些值太大還會造成一個問題,就是消息沒有在指定時間內(max.poll.interval.ms(默認300秒))消費完,導致被rebalance,如下:

WARN  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=hsta.cdcGroup] Asynchronous auto-commit of offsets {hs_ta_channel_Metadata-0=OffsetAndMetadata{offset=22, leaderEpoch=null, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.

  此時可以通過增加max.poll.interval.ms、降低max.poll.records、消費者新開線程開解決。可參見:https://www.cnblogs.com/muxi0407/p/11697709.html、https://www.jianshu.com/p/271f88f06eb3、https://blog.csdn.net/shibuwodai_/article/details/80678717。

  而kafka本身有個bug(服務器端的rebalance.timeout.ms(默認60秒)不生效),這會導致消費者組的rebalance時間比較長,所以這是需要注意的,參見https://blog.csdn.net/u013200380/article/details/87868696。

  kafka所有配置可參考:https://docs.cloudera.com/documentation/kafka/latest/topics/kafka_performance.htmlhttps://kafka.apache.org/documentation/#configuration


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM