一、安裝
kafka可以通過官網下載:https://kafka.apache.org/downloads
kafka根據Scala版本不同,又分為多個版本,我不需要使用Scala,所以就下載官方推薦版本kafka_2.12-2.4.0.tgz。
使用tar -xzvf kafka_2.12-2.4.0.tgz 解壓
為了使用方便,可以創建軟鏈接kafka0
二、Zookeeper配置
當前下載的kafka程序里自帶Zookeeper,可以直接使用其自帶的Zookeeper建立集群,也可以單獨使用Zookeeper安裝文件建立集群。
1. 單獨使用Zookeeper安裝文件建立集群
Zookeeper的安裝及配置可以參考另一篇博客,里面有詳細介紹
https://www.cnblogs.com/zhaoshizi/p/12105143.html
2. 直接使用其自帶的Zookeeper建立集群
kafka自帶的Zookeeper程序腳本與配置文件名與原生Zookeeper稍有不同。
kafka自帶的Zookeeper程序使用bin/zookeeper-server-start.sh,以及bin/zookeeper-server-stop.sh來啟動和停止Zookeeper。
而Zookeeper的配制文件是config/zookeeper.properties,可以修改其中的參數
(1) 啟動Zookeeper
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
加-daemon參數,可以在后台啟動Zookeeper,輸出的信息在保存在執行目錄的logs/zookeeper.out文件中。
對於小內存的服務器,啟動時有可能會出現如下錯誤
os::commit_memory(0x00000000e0000000, 536870912, 0) failed; error='Not enough space' (errno=12)
可以通過修改bin/zookeeper-server-start.sh中的參數,來減少內存的使用,將下圖中的-Xmx512M -Xms512M改小。
(2)關閉Zookeeper
bin/zookeeper-server-stop.sh -daemon config/zookeeper.properties
三、kafka配置
kafka的配置文件在config/server.properties文件中,主要修改參數如下,更具體的參數說明以后再整理下。
broker.id是kafka broker的編號,集群里每個broker的id需不同。我是從0開始。
listeners是監聽地址,需要提供外網服務的話,要設置本地的IP地址
log.dirs是日志目錄,需要設置
設置Zookeeper集群地址,我是在同一個服務器上搭建了kafka和Zookeeper,所以填的本地地址
num.partitions 為新建Topic的默認Partition數量,partition數量提升,一定程度上可以提升並發性
內部topic配置
內部__consumer_offsets和__transaction_state兩個topic,分組元數據的復制因子,為了保證可用性,在生產上建議設置大於1。
default.replication.factor為kafka保存消息的副本數,如果一個副本失效了,另一個還可以繼續提供服務,是在自動創建topic時的默認副本數,可以設置為3
因為要創建kafka集群,所以kafka的所有文件都復制兩份,配置文件做相應的修改,尤其是brokerid、IP地址和日志目錄。分別創建軟鏈接kafka1和kafka2。
四、啟動及停止kafka
1. 啟動kafka
bin/kafka-server-start.sh -daemon config/server.properties
-daemon 參數會將任務轉入后台運行,輸出日志信息將寫入日志文件,日志文件在執行命令的目錄下的logs目錄中kafkaServer.out,結尾輸同started說明啟動成功。
也可以用jps命令,看有沒有kafka的進程
2. 停止kafka
bin/kafka-server-stop.sh config/server.properties
五、測試
kafka和Zookeeper已啟動完成
1. 創建topic
bin/kafka-topics.sh --create --zookeeper 192.168.202.128:2181 --replication-factor 3 --partitions 3 --topic test
2. 查看主題
bin/kafka-topics.sh --list --zookeeper 192.168.202.128:2181
3. 發送消息
bin/kafka-console-producer.sh --broker-list 192.168.202.128:9094 --topic test
4. 接收消息
bin/kafka-console-consumer.sh --bootstrap-server 192.168.202.128:9092 --topic test --from-beginning
5. 查看特定主題的詳細信息
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test
從中可以看到,test主題分了三個區,復制因子是3。
6. 刪除主題
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test