文章更新時間:2020/06/07
一、安裝JDK
過程就不過多介紹了...
二、安裝Zookeeper
安裝過程可以參考此處~
三、安裝並配置kafka
- Kafka下載地址 http://kafka.apache.org/downloads
- 解壓文件(我的目錄是E:\zhanghaoBF\kafka\kafka_2.11-2.3.0 【PS:這里不要在Program Files等文件名之間有空格的目錄下,不然一會執行會不識別路徑】)
- 打開目錄E:\zhanghaoBF\kafka\kafka_2.11-2.3.0\config下server.properties文件,把log.dirs修改為【log.dirs=D:\kafka_2.12-0.11.0.0\kafka-logs】,把listeners的配置放開
一些主要的配置信息如下:
#1、broker實例id 這個是kafka集群區分每個節點的唯一標志符 broker.id=0 #2、監聽的端口號 listeners=PLAINTEXT://:9092 #3、消息的存放路徑 log.dirs=E:\zhanghaoBF\【完成】SpringBoot整合kafka\kafka_2.11-2.3.0\zhanghaoBFkafkakafka_2.11-2.3.0kafka-logs #4、kafka消息的保留時間,默認為7天即168小時【單位為:小時】 log.retention.hours=168 #5、log文件存放着msg,這里設置的是log文件的最大大小,默認最大為1個G【單位為:byte】 log.segment.bytes=1073741824 #6、kafka鏈接的zk地址 zookeeper.connect=localhost:2181 #7、kafka鏈接的超時時間 zookeeper.connection.timeout.ms=6000 #8、默認的分區數量 num.partitions=1
四、啟動並測試kafka【以windows操作示例】
啟動ZK
啟動kafka
進入kafka文件目錄E:\zhanghaoBF\kafka\kafka_2.11-2.3.0,執行以下命令,啟動kafka通訊的服務器broker
.\bin\windows\kafka-server-start.bat .\config\server.properties
創建topic
進入kafka文件目錄E:\zhanghaoBF\kafka\kafka_2.11-2.3.0,輸入以下命令,創建kafka的消息topics【topic名為testDemo】
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testDemo
replication-factor:表示在不同的broker中保存幾份副本。設置為1表示該topic除了當前leader主機外,在其他brocker上還有1個副本。
partitions:表示分區數。設置為1表示該topic(同一個brocker上)存在有多少個分區。PS:每個分區的數據是不重復的,用於提高吞吐量。
查看topic
./kafka-topics.bat --list --zookeeper localhost:2181 ##查看localhost:2181上存在的topic信息
啟動生產者
進入目錄kafka文件目錄E:\zhanghaoBF\kafka\kafka_2.11-2.3.0,輸入以下命令,創建Producer
##此命令為修改命令行窗口的名稱,此命令可不執行,直接執行下面的命令去啟動生產者
title producer
kafka-console-producer.bat --broker-list localhost:9092 --topic testDemo
啟動完成后可在窗口中輸入消息內容:
啟動消費者
進入目錄kafka文件目錄E:\zhanghaoBF\kafka\kafka_2.11-2.3.0,輸入以下命令,創建consumer
##此命令為修改命令行窗口的名稱,此命令可不執行,直接執行下面的命令去啟動消費者
title consumer
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic testDemo --from-beginning
啟動完成后可在窗口中輸入消息內容:
PS:某些博文里會推薦使用
kafka-console-consumer.bat --zookeeper localhost:2181 --topic testDemo
這個命令來啟動消費者,這里說明一下,低版本的kafka是用這個命令來啟動消費者並進行消息消費的,由於本文使用的kafka版本較高,所以才用的上面那個命令行進行消費者啟動,並消費信息。