最近公司項目需要用到kafka消息隊列,所以特地花了一個周末的時間對kafka的搭建以及使用做了一個了解,特地記錄一下。
kafka搭建需要依賴於zookeeper作為注冊中心,前面兩篇文已經介紹了zookeeper的環境搭建,現在將記錄一下kafka的搭建,同樣也是選取三個節點
如下:
Node1:192.168.153.133
Node2:192.168.153.137
Node3:192.168.153.138
開始安裝kafka,參照官網地址:http://kafka.apachecn.org/quickstart.html
執行命令:
/#進入下載目錄 cd /usr/opt/ #下載kafka wget http://archive.apache.org/dist/kafka/1.0.0/kafka_2.11-1.0.0.tgz #解壓文件並且重命名 tar -zxvf kafka_2.11-1.0.0.tgz && mv kafka_2.11-1.0.0 kafka
編輯配置文件(三台機器都是如此配置,記得修改ip以及broker.id):
#修改日志存儲地址 log.dirs=/usr/opt/kafka/data #節點id,用於區分子節點,就算是節點ip改變了,也能找到此節點 broker.id=1 #將使用外部的zookeeper配置 zookeeper.connect=192.168.153.133:2008,192.168.153.137:2008,192.168.153.138:2008 #指定主機地址,區分localhost,避免出錯 host.name=192.168.153.137 #監聽地址 listeners = PLAINTEXT://192.168.153.137:9092 #添加此項是為了api能夠監聽到此服務 advertised.listeners = PLAINTEXT://192.168.153.137:9092
配置kafka環境變量:
#編輯環境配置
vim /etc/profile
加入以下配置:
export PATH=$PATH:$ZOOKEEPER_HOME/bin
export KAFKA_HOME=/usr/opt/kafka
然后生效配置文件:
#環境配置文件生效
source /etc/profile
后台啟動kafka
./bin/kafka-server-start.sh -daemon config/server.properties
檢查kafka是否啟動成功:
jps
啟動成功之后顯示:

更多命令查看官網介紹:http://kafka.apachecn.org/quickstart.html
創建生產者
bin/kafka-topics.sh --create --zookeeper 192.168.153.137:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
測試:
查看主題列表 > bin/kafka-topics.sh --list --bootstrap-server 192.168.153.137:9092 啟動控制台消費者 > bin/kafka-console-consumer.sh --bootstrap-server 192.168.153.137:9092 --topic my-replicated-topic--from-beginning 啟動控制台消費者 > bin/kafka-console-consumer.sh --bootstrap-server 192.168.153.138:9092 --topic my-replicated-topic--from-beginning
在生產者發送一條消息,可以再消費者都接收到,成功!
另外出現Connection to node 1 (/192.168.153.133:9092) could not be established. Broker may not be available.錯誤的時候,可能是防火牆未關閉,或者是端口訪問不同,注意檢查,
還有就是kafka 發送消息 報錯 topic test with key: null
可能是9092端口未打開,此時只要
打開端口並關閉防火牆就行
firewall-cmd --zone=public --add-port=9092/tcp --permanent service firewalld stop
然后重啟kafka即可