由於項目原因,最近經常碰到Kafka消息隊列某topic在集群宕機重啟后無法消費的情況。碰到這種情況,有三步去判斷原因所在:
step A:如果用kafka串口(即console-consumer)是可以正常消費該topic,則排除kafka集群出現故障
step B:若平台業務能正常消費其他topic的消息,則排除平台業務代碼邏輯問題
step C:不到萬不得已,則只能手動刪除kafka的對應topic的Log,但是清理Kafka Log又不能單純的去刪除中間環節產生的日志,中間關聯的很多東西需要手動同時去清理,否則可能會導致刪除后客戶端無法消費的情況。
一、Kafka消費Offset原理
在通過Client端消費Kafka中的消息時,消費的消息會同時在Zookeeper和Kafka Log中保存,如上圖紅線所示。
當手動刪除Kafka某一分片上的消息日志時,如上圖藍線所示,此是只是將Kafka Log中的信息清0了,但是Zookeeper中的Partition和Offset數據依然會記錄。當重新啟動Kafka后,我們會發現如下二種情況:
A、客戶端無法正常用消費;
B、在使用Kafka Consumer Offset Monitor工具進行Kafka監控時會發現Lag(還有多少消息數未讀取(Lag=logSize-Offset))為負數;其中此種情況的刪除操作需要我們重點關注,后面我們也會詳細介紹其對應的操作步驟。
一般正常情況,如果想讓Kafka客戶端正常消費,那么需要Zookeeper和Kafka Log中的記錄保持如上圖黃色所示。
二、Kafka消息日志清除
操作步驟主要包括:
1、停止Kafka運行;
2、刪除Kafka消息日志;
3、修改ZK的偏移量;
4、重啟Kafka;
上述步驟重點介紹其中的關鍵步驟。
第2步:刪除Kafka消息日志時,進入Kafka消息日志路徑(可通過查看$KAFKA_HOME/config/server.properties中的“log.dirs”知曉),刪除相應topic文件夾下所有文件(如:“rm -rf ./topicA”);
第3步:修改ZK的偏移量時,進入ZK的安裝目錄下,運行./bin/zkCli.sh -server (中間以,分割),如果不帶server默認修改的為本機。
示例如下:
A.運行$ZOOKEEPER_HOME/bin/zkCli.sh -server Master:2181,Slave1:2181,Slave2:2181
B.在ZK上運行ls /consumers/對應的分組/offsets/對應的topic,就可以看到此topic下的所有分區了;
通過get /consumers/對應的分組/offsets/對應的topic/對應的分區號,可以查詢到該分區上記錄的offset;
通過set /consumers/對應的分組/offsets/對應的topic/對應的分區號 修改后的值(一般為0,重置),即可完成對offset的修改;
(注意:B步驟中的“/consumers”由實際配置情況決定)
三、重建Topic
操作步驟主要包括如下:
1、刪除Topic;
2、刪除log日志;
3、刪除ZK中的Topic記錄
第一步:刪除Topic
運行$KAFKA_HOME/bin/kafka-topics.sh -delete -zookeeper [zookeeper server] -topic [topic name];如果kafka啟動時加載的配置文件server.properties沒有配置delete.topic.enable = true,那么此時的刪除並不是真正的刪除。而只是把topic標記為:marked for deletion,此時就需要執行第3步的操作;
第三步:刪除ZK中的Topic記錄
示例如下:
A.運行$ZOOKEEPER_HOME/bin/zkCli.sh -server Master:2181,Slave1:2181,Slave2:2181
B.進入/admin/delete_topics目錄下,找到刪除的topic,刪除對應的信息。
四、重新啟動Kafka集群