作者:Jack47
轉載請保留作者和原文出處
歡迎關注我的微信公眾賬號程序員傑克,兩邊的文章會同步,也可以添加我的RSS訂閱源。
交代一下背景:我們的后台系統是一套使用Kafka消息隊列的數據處理管線:Kafka->Logstash->Elasticsearch。這些組件都跑在Docker的容器環境里,我們是基於Kubernetes來編排整個后端的數據處理管線上的容器。Kafka需要暴露在外網里,接收Kafka Producer(filebeat, collectd)發過來的消息。本文是記錄基於Kubernetes在AWS上部署Kafka 0.9.x版本時遇到的問題和排查思路。
為了能夠在外網也能訪問Kafka,Kafka組件對應的Kubernetes ServiceType選用的是NodePort,Kafka集群有三個節點,即Kafka Server有三個Broker。對外暴露的端口是39092,三個Broker對應的外網IP分別是 59.64.11.21
,59.64.11.22
,59.64.11.23
。Kafka組件部署完成后,使用Kafka producer連接Server,配置的kafka bootstrap_servers
是 59.64.11.22:39092
。這里之所以端口使用的是 39092,而非9092,是因為Kubernetes對外暴露的端口,分配的是39092。接着往下看,你會發現這樣會有問題。
connection refused
Kafka client的日志里報錯:
2016-11-22T07:23:33.312102145Z 2016-11-22T07:23:33Z WARN Failed to connect to broker 59.64.11.21:30791: dial tcp 52.198.148.31:30791: getsockopt: connection refused
2016-11-22T07:23:33.312102145Z
2016-11-22T07:23:33.312102145Z 2016-11-22T07:23:33Z WARN kafka message: client/metadata got error from broker while fetching metadata:%!(EXTRA *net.OpError=dial tcp 52.198.148.31:30791: getsockopt: connection refused)
第一反應是去看AWS實例的安全組(Security Group),發現忘了允許39092這個端口的接入。配置完成后,重啟Kafka client。
No available broker
接下來遇到了這個錯誤:
2016-11-22T07:23:33.312102145Z 2016-11-22T07:23:33Z WARN kafka message: client/metadata no available broker to send metadata request to
2016-11-22T07:23:33.312102145Z 2016-11-22T07:23:33Z WARN client/brokers resurrecting 1 dead seed brokers
Google一番,然后通過閱讀Kafka的文檔(一定要注意查看對應版本的文檔),發現原因了。
當Kafka broker啟動時,它會在ZK上注冊自己的IP和端口號,客戶端就通過這個IP和端口號來連接。在AWS這種IaaS環境下,由於java.net.InetAddress.getCanonicalHostName
調用拿到的HostName是類似ip-172-31-10-199
這樣的只有內網才能訪問到的主機名,所以默認注冊到ZK上的IP是內網才能訪問的內網IP。此時就需要顯示指定 advertised.host.name, advertised.listeners參數,讓注冊到ZK上的IP是外網IP。
例如對於 59.64.11.22 IP對應的broker,需要在 server.properties 配置文件里增加如下三個配置:
advertised.listeners=PLAINTEXT://59.64.11.22:9092
advertised.host.name=59.64.11.22
advertised.port=9092
估計讀者們也會跟我一樣犯迷糊,為什么需要三個參數來配置IP和端口號呢,用一個advertised.listeners
不就搞定了嗎?后來發現最新版本0.10.x broker配置棄用了advertised.host.name
和 advertised.port
這兩個個配置項,就配置advertised.listeners就可以了。:joy
found some partitions to be leaderless
2016-11-22T02:58:36Z WARN kafka message: client/metadata found some partitions to be leaderless
又是 Read The Fucking Manual,發現Kafka生產者會先連接 bootstrap_servers列表里的某個節點,這台機器只是用來獲取集群的元數據(meta data):拿到topics,partitions和 replicas的信息。查到寫入的topic對應的leader節點(可能不是剛才的那個節點)后,與之建立連接,然后發送實際的數據給leader。有了這個背景知識,上面的這個錯誤信息就不難理解了,說明獲取元數據沒問題,但是連不到對應的leader節點。經過跟同事探討,發現了問題所在:Kafka使用的Kubernetes ServiceType是NodePort
,而NodePort
這種服務類型是有負載均衡的,所以Kafka producer連接 leader節點時,由於有負載均衡,所以實際會連到其他節點上。把Kafka從Kubernetes的普通服務改為了無頭服務(Headless),然后端口要從NodePort
改為HostPort
這種類型。這樣就沒有負載均衡,同時又能從外網訪問Kafka節點。
還是不行
上述改動改完后,發現還是報 found some partitions to be leaderless
的錯。實在不行,那就只能去zk上去看看到底broker是什么狀態了。
root@kafka-0:/usr/share/easemon/kafka/bin# ./zookeeper-shell.sh 10.10.18.3
ls /brokers/ids
[1012, 1013, 1014]
可以看到 有三個broker,id分別是 1012,1013,1014。然后接着看看這些broker注冊的IP地址(接着在 zookeeper-shell里執行命令):
get /brokers/ids/1012
{"jmx_port":-1,"timestamp":"1480404348685","endpoints":["PLAINTEXT://59.64.11.22:9092"],"host":"59.64.11.22","version":2,"port":9092}
看起來也符合預期,注冊到 59.64.11.22
的9092端口上了。
那就看看filebeat這個topic相關的信息吧:
/usr/share/easemon/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic filebeat
Topic:filebeat PartitionCount:3 ReplicationFactor:1 Configs:
Topic: filebeat Partition: 0 Leader: 1005 Replicas: 1005 Isr: 1005
Topic: filebeat Partition: 1 Leader: 1006 Replicas: 1006 Isr: 1006
Topic: filebeat Partition: 2 Leader: 1007 Replicas: 1007 Isr: 1007
等等,這里三個分區的leader怎么是1005,1006,1007呢,應該是1012,1013,1014才對。然后想到應該是Kafka的數據(通過配置項log.dirs指定位置)沒有放到持久存儲上,而是放到了Docker容器內,導致每次重新部署后,原來的數據都丟了,節點都注冊到新的brokerID上去了。修改完成后,重新部署,發現Kafka上終於有數據了。
關於 Broker id
多說兩句,我們環境中Kafka的Broker id是使用默認自動生成的id,所以都是1000以上的。而如果你的環境中如果是手工指定的,必須在1000以下,否則根據這篇文章,Kafka不報錯,會直接退出。
自動創建Topic
auto.create.topics.enable
參數可以用來配置Kafka Server是否自動創建topic,但這個是針對Producer而言的,如果Consumer消費某個不存在的topic時,是不會觸發自動創建的邏輯的。所以當Consumer消費某個不存在的topic時,由於具體的實現不一樣,可能會出現報錯的情況。
參考資料:
如果您看了本篇博客,覺得對您有所收獲,請點擊右下角的“推薦”,讓更多人看到!

