Kafka常見問題及解決方法


前言

Apache Kafka是一款優秀的開源消息中間件,主要應用於活動跟蹤、消息穿透、日志、流處理等場景。我們使用該產品時,首先應當需要了解該產品的特性,以及產品的說明
但是由於官方文檔較多,實際在使用的過程中,quick start往往是我們接觸的第一步,但是quick start的配置實在是太過簡陋,從而在實際的使用過程甚至在生產環境中發生一些嚴重的問題。話不多說,以下內容為實踐中遇到的問題及解決辦法詳解。

正文

1、Kafka集群無法在跨網絡的環境中正常工作

kafka在實際使用的過程中,同樣的配置,在不同的網絡環境下,可能會導致無法正常工作,我們應該分清內網和外網的區別。

在Kafka中涉及到網絡的核心配置參數主要為兩個

參數名 配置參考 說明 注意
listeners PLAINTEXT://hostname:9092 主要用來定義Kafka Broker的Listener的配置項。hostname如果設置為0.0.0.0則綁定所有的網卡地址;如果hostname為空則綁定默認的網卡。如果沒有配置hostname則默認為java.net.InetAddress.getCanonicalHostName() 如果只設置該參數,那么就無法進行跨網絡的訪問,只有內網中的服務可以用
advertised.listeners PLAINTEXT://hostname:9092 作用主要是向生產者和消費者公布主機名和端口。如果不設置該值,那么將會默認使用listeners的值 在docker中或者在虛擬機上部署kafka集群,這種情況下是需要用到 advertised.listeners,這樣可以確保外部的生產者和消費者能夠正確的使用kafka服務。

核心配置示例

listeners: INSIDE://:9092,OUTSIDE://0.0.0.0:9094  
advertised.listeners: INSIDE://:9092,OUTSIDE://<網卡ip或主機名>:端口   
listener.security.protocol.map: "INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT"  
inter.broker.listener.name: "INSIDE"

說明:listener.security.protocol.mapinter.broker.listener.name分別表示的是協議的傳輸方式(以上為明文)和監聽的borker。

2、Kafka集群數據節點不均衡

錯誤的創建topic,可能會導致生產者無法均衡或隨機的往kafka集群中推送數據,消費者無法均衡的消費數據,kafka集群中部分節點數據不均衡,系統不穩定,無法有效的高可用。

創建topic主要有兩種方式

  • 手動創建
    bin/kafka-topics.sh --create --zookeeper {hostname:port} --replication-factor 3 --partitions 6 --topic test
  1. replication-factor表示副本數量,建議至少設置為2,一般是3,最高設置為4。合理的設置該數量可以保證系統更穩定(允許N-1個broker宕機),但是更多的副本(如果acks=all,則會造成較高的延時),系統磁盤的使用率會更高(一般若是為3,則相對於為2時,會占據更多50% 的磁盤空間)
  2. partitions表示分區,kafka通過分區策略,將不同的分區分配在一個集群中的broker上,一般會分散在不同的broker上,當只有一個broker時,所有的分區就只分配到該Broker上。一般來說 Kafka 不能有太多的 Partition,一個broker不應該承載超過2000 到 4000 個partitions(考慮此broker上所有來自不同topics的partitions)。
    同時,一個Kafka集群上brokers中所有的partitions總數最多不應超過20,000個,集群節點數量低於6的時候,我們通常設置的值為2* broker數。此准則基於的原理是:在有broker宕機后,zookeeper需要重新做選舉。若是partitions數目過多,則需要執行大量的選舉策略。
  • 自動創建
    需要將auto.create.topics.enable設置為true,kafka發現該topic不存在的話,會按默認配置自動創建topic。
    配置示例:
num.partitions=6 #自動創建的partitions值為6
default.replication.factor=3 #自動創建的replication_factor值為3
auto.create.topics.enable=true # 開啟自動創建

具體的數值設置,可以參考手動創建的中的詳細說明,然后通過測試結果進行調整。

3、Kafka集群日志數據過大堆積磁盤

生產者在推送數據后,kafka通常會在data目錄下對應的topic數據目錄中生成日志信息,隨着時間的增長,如果沒有對日志作清理動作,那么必定導致磁盤的不可用。

清理策略

  • log.cleanup.policy=delete,kafka日志的清理策略,默認是delete,就是根據配置的時間空間來清理日志;還可以配置成compact,當舊數據的回收時間或者尺寸限制到達時,會進行日志壓縮。
# 需要自己根據實際情況設置
log.retention.bytes=-1
# 默認的保留時間是7天
log.retention.hours=168

值得注意的是log.retention.bytes表示的是每個topic下每個partition保存數據的總量;注意,這是每個partitions的上限,因此這個數值乘以partitions的個數就是每個topic保存的數據總量。同時注意:如果log.retention.hours和log.retention.bytes都設置了,則超過了任何一個限制都會造成刪除一個段文件。這項設置可以由每個topic設置時進行覆蓋。

4、Kafka客戶端的注意事項

以上是kafka常見的問題及解決辦法,想要用好kafka,還必須要了解相對應的客戶端提供的各種參數含義。
以下以go作為示例,采用的是sarama客戶端中值得關注的如下:
生產者配置Producer.Partitioner分區器的選擇策略,同步異步模塊的實現方式差異。

生產者: 主要分為同步模式和異步模式

  1. 同步模式:需等待返回成功后,阻塞其他邏輯執行
  config := sarama.NewConfig()  //實例化sarama的Config
  config.Producer.Return.Successes = true  //開啟消息發送成功后通知
  config.Producer.Partitioner = sarama.NewRandomPartitioner //隨機分區器 可選擇不同的策略,如輪詢等
  client,err := sarama.NewClient([]string{"127.0.0.1:9092"}, config) //初始化
  defer client.Close()
  if err != nil {panic(err)}
  producer,err := sarama.NewSyncProducerFromClient(client) // 同步
  if err!=nil {panic(err)}
  partition, offset , err := producer.SendMessage(&sarama.ProducerMessage{Topic: "test_topic", Key: nil, Value: sarama.StringEncoder("this is go sync message")})
  if err != nil {
     panic(err)
  }
  1. 異步模式:生產者不等待成功直接返回,不阻塞其他邏輯執行
   config := sarama.NewConfig() //實例化sarama的Config
   config.Producer.Return.Successes = true //開啟消息發送成功后通知
   config.Producer.Partitioner = sarama.NewRandomPartitioner //隨機分區器 可選擇不同的策略,如輪詢等
   client, err := sarama.NewClient([]{"127.0.0.1:9092"}, config)
   if err != nil {
       panic(err)
   }
   producer, err := sarama.NewAsyncProducerFromClient //異步
   if err != nil {
       panic(err)
   }
   defer producer.Close()
   producer.Input() <- &sarama.ProducerMessage{Topic: "test_topic", Key: nil, Value: sarama.StringEncoder("this is go async message")}
   select {
           case msg := <-producer.Successes(): // 如果config.Producer.Return.Successes 設置為false 那么無需獲取報告狀態,否則必須獲取該狀態
               fmt.Printf("message successes: [%s]\n",msg.Value)
           case err := <-producer.Errors():
               fmt.Println("message failure: ", err)
           default:
               fmt.Println("message default",)
   }

消費者

消費者配置Offsets.Initial偏移量的選擇,從最舊還是最新的消息開始消費,只能是sarama.OffsetOldestsarama.OffsetNewest
Offsets.ResetOffsets :如果程序重啟,true表示不從上次中斷的位置消費,false表示從上次中斷的位置消費。

總結

本文主要是記錄在kafka開發過程中常見的問題解決辦法,通過不斷總結積累問題解決辦法,在遇到同類問題時能夠快速的復盤,以最快的速度解決問題。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM