前言
Apache Kafka是一款優秀的開源消息中間件,主要應用於活動跟蹤、消息穿透、日志、流處理等場景。我們使用該產品時,首先應當需要了解該產品的特性,以及產品的說明。
但是由於官方文檔較多,實際在使用的過程中,quick start
往往是我們接觸的第一步,但是quick start
的配置實在是太過簡陋,從而在實際的使用過程甚至在生產環境中發生一些嚴重的問題。話不多說,以下內容為實踐中遇到的問題及解決辦法詳解。
正文
1、Kafka集群無法在跨網絡的環境中正常工作
kafka在實際使用的過程中,同樣的配置,在不同的網絡環境下,可能會導致無法正常工作,我們應該分清內網和外網的區別。
在Kafka中涉及到網絡的核心配置參數主要為兩個
參數名 | 配置參考 | 說明 | 注意 |
---|---|---|---|
listeners | PLAINTEXT://hostname :9092 |
主要用來定義Kafka Broker的Listener的配置項。hostname 如果設置為0.0.0.0 則綁定所有的網卡地址;如果hostname 為空則綁定默認的網卡。如果沒有配置hostname 則默認為java.net.InetAddress.getCanonicalHostName() 。 |
如果只設置該參數,那么就無法進行跨網絡的訪問,只有內網中的服務可以用 |
advertised.listeners | PLAINTEXT://hostname :9092 |
作用主要是向生產者和消費者公布主機名和端口。如果不設置該值,那么將會默認使用listeners 的值 |
在docker中或者在虛擬機上部署kafka集群,這種情況下是需要用到 advertised.listeners ,這樣可以確保外部的生產者和消費者能夠正確的使用kafka服務。 |
核心配置示例
listeners: INSIDE://:9092,OUTSIDE://0.0.0.0:9094
advertised.listeners: INSIDE://:9092,OUTSIDE://<網卡ip或主機名>:端口
listener.security.protocol.map: "INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT"
inter.broker.listener.name: "INSIDE"
說明:
listener.security.protocol.map
和inter.broker.listener.name
分別表示的是協議的傳輸方式(以上為明文)和監聽的borker。
2、Kafka集群數據節點不均衡
錯誤的創建topic,可能會導致生產者無法均衡或隨機的往kafka集群中推送數據,消費者無法均衡的消費數據,kafka集群中部分節點數據不均衡,系統不穩定,無法有效的高可用。
創建topic主要有兩種方式
- 手動創建
bin/kafka-topics.sh --create --zookeeper {hostname:port} --replication-factor 3 --partitions 6 --topic test
replication-factor
表示副本數量,建議至少設置為2,一般是3,最高設置為4。合理的設置該數量可以保證系統更穩定(允許N-1個broker宕機),但是更多的副本(如果acks=all,則會造成較高的延時),系統磁盤的使用率會更高(一般若是為3,則相對於為2時,會占據更多50% 的磁盤空間)partitions
表示分區,kafka通過分區策略,將不同的分區分配在一個集群中的broker上,一般會分散在不同的broker上,當只有一個broker時,所有的分區就只分配到該Broker上。一般來說 Kafka 不能有太多的 Partition,一個broker不應該承載超過2000 到 4000 個partitions(考慮此broker上所有來自不同topics的partitions)。
同時,一個Kafka集群上brokers中所有的partitions總數最多不應超過20,000個,集群節點數量低於6的時候,我們通常設置的值為2* broker數。此准則基於的原理是:在有broker宕機后,zookeeper需要重新做選舉。若是partitions數目過多,則需要執行大量的選舉策略。
- 自動創建
需要將auto.create.topics.enable
設置為true
,kafka發現該topic不存在的話,會按默認配置自動創建topic。
配置示例:
num.partitions=6 #自動創建的partitions值為6
default.replication.factor=3 #自動創建的replication_factor值為3
auto.create.topics.enable=true # 開啟自動創建
具體的數值設置,可以參考手動創建的中的詳細說明,然后通過測試結果進行調整。
3、Kafka集群日志數據過大堆積磁盤
生產者在推送數據后,kafka通常會在data目錄下對應的topic數據目錄中生成日志信息,隨着時間的增長,如果沒有對日志作清理動作,那么必定導致磁盤的不可用。
清理策略
log.cleanup.policy=delete
,kafka日志的清理策略,默認是delete,就是根據配置的時間空間來清理日志;還可以配置成compact,當舊數據的回收時間或者尺寸限制到達時,會進行日志壓縮。
# 需要自己根據實際情況設置
log.retention.bytes=-1
# 默認的保留時間是7天
log.retention.hours=168
值得注意的是log.retention.bytes
表示的是每個topic下每個partition保存數據的總量;注意,這是每個partitions的上限,因此這個數值乘以partitions的個數就是每個topic保存的數據總量。同時注意:如果log.retention.hours和log.retention.bytes都設置了,則超過了任何一個限制都會造成刪除一個段文件。這項設置可以由每個topic設置時進行覆蓋。
4、Kafka客戶端的注意事項
以上是kafka常見的問題及解決辦法,想要用好kafka,還必須要了解相對應的客戶端提供的各種參數含義。
以下以go作為示例,采用的是sarama客戶端中值得關注的如下:
生產者配置Producer.Partitioner
分區器的選擇策略,同步異步模塊的實現方式差異。
生產者: 主要分為同步模式和異步模式
- 同步模式:需等待返回成功后,阻塞其他邏輯執行
config := sarama.NewConfig() //實例化sarama的Config
config.Producer.Return.Successes = true //開啟消息發送成功后通知
config.Producer.Partitioner = sarama.NewRandomPartitioner //隨機分區器 可選擇不同的策略,如輪詢等
client,err := sarama.NewClient([]string{"127.0.0.1:9092"}, config) //初始化
defer client.Close()
if err != nil {panic(err)}
producer,err := sarama.NewSyncProducerFromClient(client) // 同步
if err!=nil {panic(err)}
partition, offset , err := producer.SendMessage(&sarama.ProducerMessage{Topic: "test_topic", Key: nil, Value: sarama.StringEncoder("this is go sync message")})
if err != nil {
panic(err)
}
- 異步模式:生產者不等待成功直接返回,不阻塞其他邏輯執行
config := sarama.NewConfig() //實例化sarama的Config
config.Producer.Return.Successes = true //開啟消息發送成功后通知
config.Producer.Partitioner = sarama.NewRandomPartitioner //隨機分區器 可選擇不同的策略,如輪詢等
client, err := sarama.NewClient([]{"127.0.0.1:9092"}, config)
if err != nil {
panic(err)
}
producer, err := sarama.NewAsyncProducerFromClient //異步
if err != nil {
panic(err)
}
defer producer.Close()
producer.Input() <- &sarama.ProducerMessage{Topic: "test_topic", Key: nil, Value: sarama.StringEncoder("this is go async message")}
select {
case msg := <-producer.Successes(): // 如果config.Producer.Return.Successes 設置為false 那么無需獲取報告狀態,否則必須獲取該狀態
fmt.Printf("message successes: [%s]\n",msg.Value)
case err := <-producer.Errors():
fmt.Println("message failure: ", err)
default:
fmt.Println("message default",)
}
消費者
消費者配置
Offsets.Initial
偏移量的選擇,從最舊還是最新的消息開始消費,只能是sarama.OffsetOldest
或sarama.OffsetNewest
。
Offsets.ResetOffsets
:如果程序重啟,true
表示不從上次中斷的位置消費,false
表示從上次中斷的位置消費。
總結
本文主要是記錄在kafka開發過程中常見的問題解決辦法,通過不斷總結積累問題解決辦法,在遇到同類問題時能夠快速的復盤,以最快的速度解決問題。