因為kafka是基於Zookeeper的,而Zookeeper一般都是一個分布式的集群,盡管kafka有自帶Zookeeper,但是一般不使用自帶的,都是使用外部安裝的,所以首先我們需要安裝Zookeeper,可以參考:Zookeeper基礎教程(二):Zookeeper安裝
Zookeeper集群地址:
# 192.168.209.133 test1 # 192.168.209.134 test2
# 192.168.209.135 test3
為了方便,我這里也使用這三個地址安裝部署kafka,注意,安裝kafka之前需確保已安裝了jdk!
首先,前往Kafka官網下載Kafka的安裝包:http://kafka.apache.org/downloads.html
可以現在windows瀏覽器上下載好,然后將文件發到linux服務器下,博主使用的linux服務器版本是Ubuntu16.04的server版
下載完成之后,將這個tgz壓縮包傳到linux上去(可以使用xshell,或者filezilla也可以)
# 在tgz壓縮包所在目錄進行解壓,-C 表示解壓后的文件所存放的路徑,這里表示解壓后的文件放到/opt目錄下
sudo tar -zxvf kafka_2.12-2.5.0.tgz -C /opt/
# 進入kafka的配置目錄
cd /opt/kafka_2.12-2.5.0/config
# 其中server.properties是kafka的主要配置文件
sudo vim server.properties
server.properties常用參數介紹:
# 當前機器在集群中的唯一標識,和zookeeper的myid性質一樣,要求集群中每個broker.id都說不一樣的,可以從0開始遞增,也可以從1開始遞增 broker.id=0 # 監聽地址,需要提供外網服務的話,要設置本地的IP地址,當前kafka對外提供服務的端口默認是9092 listeners=PLAINTEXT://test1:9092 # 這個是borker進行網絡處理的線程數 num.network.threads=3 # 這個是borker進行I/O處理的線程數 num.io.threads=8 # 發送緩沖區buffer大小,數據不是一下子就發送的,先回存儲到緩沖區了到達一定的大小后在發送,能提高性能 socket.send.buffer.bytes=102400 # kafka接收緩沖區大小,當數據到達一定大小后在序列化到磁盤 socket.receive.buffer.bytes=102400 # 這個參數是向kafka請求消息或者向kafka發送消息的請請求的最大數,這個值不能超過java的堆棧大小 socket.request.max.bytes=104857600 # 消息存放的目錄,這個目錄可以配置為“,”逗號分割的表達式,上面的num.io.threads要大於這個目錄的個數這個目錄, # 如果配置多個目錄,新創建的topic他把消息持久化的地方是,當前以逗號分割的目錄中,那個分區數最少就放那一個 log.dirs=/tmp/kafka-logs # 默認的分區數,一個topic默認1個分區數 num.partitions=1 # 每個數據目錄用來日志恢復的線程數目 num.recovery.threads.per.data.dir=1
# topic的offset的備份份數
offsets.topic.replication.factor=1
# 事務主題的復制因子(設置更高以確保可用性)。 內部主題創建將失敗,直到群集大小滿足此復制因素要求。
transaction.state.log.replication.factor=1
# 覆蓋事務主題的min.insync.replicas配置。
transaction.state.log.min.isr=1 # 默認消息的最大持久化時間,168小時,7天 log.retention.hours=168
# 日志達到刪除大小的閾值。每個topic下每個分區保存數據的最大文件大小;注意,這是每個分區的上限,因此這個數值乘以分區的個數就是每個topic保存的數據總量
log.retention.bytes=1073741824 # 這個參數是:因為kafka的消息是以追加的形式落地到文件,當超過這個值的時候,kafka會新起一個文件 log.segment.bytes=1073741824 # 每隔300000毫秒去檢查上面配置的log失效時間 log.retention.check.interval.ms=300000 # 是否啟用log壓縮,一般不用啟用,啟用的話可以提高性能 log.cleaner.enable=true # 設置zookeeper的連接端口,多個地址以逗號(,)隔開,后面可以跟一個kafka在Zookeeper中的根znode節點的路徑 zookeeper.connect=localhost:2181 # 設置zookeeper的連接超時時間 zookeeper.connection.timeout.ms=18000
# 在執行第一次再平衡之前,group協調員將等待更多消費者加入group的時間
group.initial.rebalance.delay.ms=0
我們可以根據自己的情況配置,比如我這里只需要配置:
# 192.168.209.133 test1配置
broker.id=0
listeners=PLAINTEXT://test1:9092
zookeeper.connect=test1:2181,test2:2181,test3:2181/kafka
# 192.168.209.134 test2配置
broker.id=1
listeners=PLAINTEXT://test2:9092
zookeeper.connect=test1:2181,test2:2181,test3:2181/kafka
# 192.168.209.135 test3配置
broker.id=2
listeners=PLAINTEXT://test3:9092
zookeeper.connect=test1:2181,test2:2181,test3:2181/kafka
現在就可以啟動Kafka了,注意,啟動前保證我們的Zookeeper是正常運行的!
# 啟動 ,可以加上-daemon表示后台啟動
sudo /opt/kafka_2.12-2.5.0/bin/kafka-server-start.sh -daemon /opt/kafka_2.12-2.5.0/config/server.properties
# 停止kafka
sudo /opt/kafka_2.12-2.5.0/bin/kafka-server-stop.sh
等三台kafka都啟動之后,使用ZooInspector連接Zookeeper可以看到多了一個路徑為/kafka的znode節點,啟動/kafka/brokers/ids的自己點就是我們上面配置的broker.id:
截圖中/kafka節點下的所有znode節點都是kafka所生成的,可以認為記錄的是kafka運行狀態的一些信息
如果啟動過程中報錯:kafka.common.InconsistentClusterIdException: The Cluster ID XXXXXXXXXXXXXX doesn't match stored clusterId Some(XXXXXXXXXXXXXXX) in meta.properties
可以前往server.properties中配置的log.dirs目錄下,找到meta.properties文件,將其中的cluster.id=XXXXXXXX給注釋掉,然后重新啟動kafka就可以了
創建Topic
使用kafka-topics.sh創建topic:
sudo /opt/kafka_2.12-2.5.0/bin/kafka-topics.sh --create --zookeeper test1:2181/kafka --replication-factor 3 --partitions 3 --topic test
# --create 表示創建,--delete 表示刪除 --describe 表示獲取詳情 --list 表示列出所有的topic
# --zookeeper 表示創建topic時連接Zookeeper所使用的的連接,注意,后面需要帶上根路徑名,這個和我們在server.properties中配置zookeeper.connect節點中的根路徑一致
# --replication-factor 創建的topic副本數,不能大於broker的個數,否則會拋出異常:ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: M larger than available brokers: N.
# --partitions topic的分區數,最好是等於broker數
# --topic 表示topic
這時刷新ZooInspector可以看到
發布消費消息
發布消息使用kafka-console-producer.sh
sudo /opt/kafka_2.12-2.5.0/bin/kafka-console-producer.sh --bootstrap-server test1:9092 --topic test
# --bootstrap-server 表示連接的kafka的broker,可以在ZooInspector中查看
執行命令后即進入命令行,可以輸入消息了:
消費消息使用kafka-console-consumer.sh
sudo /opt/kafka_2.12-2.5.0/bin/kafka-console-consumer.sh --bootstrap-server test1:9092 --topic test
# 可以增加--from-beginning參數表示從頭開始消費
上述命令執行后,就開始接收消息了,當重新發送一條消息,就會打印出來了:
使用kafkatool連接使用Kafka
首先下載kafkatool可以前往官網:https://www.kafkatool.com/download.html
或者在百度網盤下載:https://pan.baidu.com/s/1WVbRWW5thzJ9ZCGrimDekQ (提取碼: 3h88)
安裝后打開,File=>New Connection創建一個新連接:
kafkatool提供了兩種連接方式:
第一種是指定kafka在Zookeeper中的節點信息,然后kafkatool根據Zookeeper上的信息去獲取各個Broker的信息。
第二種方式是手動提供kafka的Broker地址和端口(不用提供全部Broker,部分也可以)。
填好信息后,可以點擊右下角的Test測試連接是否正常,確認正常后點擊Add添加連接。
連接上后,可以看到我們的Broker還有我們創建的Topic:
點擊指定的Topic可以查看這個Topic下的分區即消息等等:
注:如果您也像我這樣使用了hosts文件做了一層ip:hostname映射,可能導致在windows中無法連接或者Topic和Comsumers等操作失敗,可以試試在windows的hosts中配置ip:hostname