前面分享了kafka的基本知識,下面就要對kafka進行實操,先看如何安裝。
kafka需要zookepper的支持,所以要安裝kafka需要有zookeeper的環境,zookeeper安裝請參見《zookeeper之二:手把手教你安裝zookeeper3.7.0(絕對實用)》。這里可以是一個單獨的zookepper的環境也可以使用kafka集成的zookeeper環境,建議使用單獨的zookeeper環境。本文以單獨環境為例,集成環境請留言。
1、下載
kafka的官網地址是:http://kafka.apache.org/
從官網下載最新的kafka的安裝包,如下圖
這里選擇二進制包進行安裝,
我這里已經下載好Scala2.12版本的kafka安裝包,
2、安裝
2.1、單機版
2.1.1、環境
os:在虛擬機環境中的centos7-64進行安裝。
zookeeper:zookeeper已經安裝好,是3.7的版本。環境在本機:127.0.0.1:2182,127.0.0.1:2183,127.0.0.1:2184
JDK:JDK環境已安裝,是1.8的版本
2.1.2、修改配置文件
kafka的安裝包解壓后的目錄結構如下,
bin目錄下存放的是kafka的腳本文件,包括啟動、停止、客戶端、消費端等;
config目錄下存放的是和配置相關的文件,包括客戶端、消費端、服務器、zookeeper的配置等;
libs目錄存放的是kafka啟動用到的文件及依賴;
logs目錄存放的是kafka的運行時日志文件;
了解完了kafka的文件目錄,下面進行單機版kafka配置文件的修改。要修改的文件是config/server.properties文件,該文件比較長,這里就不貼了,看下幾個必要的配置。這里有必要提下kafka的配置文件做的很好,有詳細的解釋還有例子。
broker.id broker的id或者編號,在集群中該編號必須唯一
# The id of the broker. This must be set to a unique integer for each broker. broker.id=0
listeners kafka服務器監聽的端口,該端口也是對外提供服務的端口
# The address the socket server listens on. It will get the value returned from # java.net.InetAddress.getCanonicalHostName() if not configured. # FORMAT: # listeners = listener_name://host_name:port # EXAMPLE: # listeners = PLAINTEXT://your.host.name:9092 listeners=PLAINTEXT://192.168.117.128:9092
num.partitions topic下分區的數量
# The default number of log partitions per topic. More partitions allow greater # parallelism for consumption, but this will also result in more files across # the brokers. num.partitions=1
log.dirs 消息的存放目錄,這里看配置是日志的意思,因為kafka把消息使用日志的形式存儲,所以這里不要和kafka的運行日志相混淆。
# A comma separated list of directories under which to store log files log.dirs=/home/dev/datas/kafka_2.12-2.8.0/single
log.retentio.hours 消息保存的小時數
# The minimum age of a log file to be eligible for deletion due to age log.retention.hours=168
default.replication.factor 消息的副本數量,這是kafka高可用、數據不丟失的關鍵
default.replication.factor=3
zookeeper.connect zookeeper的地址
# Zookeeper connection string (see zookeeper docs for details). # This is a comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". # You can also append an optional chroot string to the urls to specify the # root directory for all kafka znodes. zookeeper.connect=localhost:2182/kafka,localhost:2183/kafka,localhost:2184/kafka
配置完上面的配置項后就可以啟動單機版的kafka了
2.1.3、啟動
有了上面的配置,現在啟動kafka,在kafka的解壓目錄下執行,
bin/kafka-server-start.sh config/server.properties
執行后看到下面的日志打印,
看到是在前台啟動的,有沒有辦法后台啟動那,先停掉kafka,加上-daemon則使用守護線程在后台啟動。
bin/kafka-server-start.sh -daemon config/server.properties
2.2、集群版
2.2.1、環境
集群版的環境和單機版的環境是一樣的,需要zookeeper和JDK。由於集群版要求必須是2N+1台機器,我部署最少節點的集群,在虛擬機上起3個進程代表3台機器。
IP | id | 端口 | 日志路徑 |
本機 | 1 | 9093 | /home/dev/datas/kafka_2.12-2.8.0/cluster/node1 |
本機 | 2 | 9094 | /home/dev/datas/kafka_2.12-2.8.0/cluster/node2 |
本機 | 3 | 9095 | /home/dev/datas/kafka_2.12-2.8.0/cluster/node3 |
2.2.2、修改配置文件
配置項和單機版的基本相同,選擇在config下cluster文件夾,其下放置3個配置文件,
每個文件的內容可參考單機的,這里不再貼出。
關於日志路徑也是要事先建好相應的目錄。
2.2.3、啟動
啟動命令和單機的也是相同的,選擇在后台啟動,每次啟動指定不同的配置文件,
[dev@localhost kafka_2.12-2.8.0]$ bin/kafka-server-start.sh -daemon config/cluster/server_node3.properties
3、測試
3.1、單機版測試
參見3.2集群版測試方法
3.2、集群版測試
3.2.1、通過查看進程的方式
使用jps命令查看進程
[dev@localhost kafka_2.12-2.8.0]$ jps
可以看到下面的打印
說明啟動了三個kafka進程
3.2.2、使用客戶端方式
使用kafka自帶的客戶端來創建一個topic
[dev@localhost kafka_2.12-2.8.0]$ bin/kafka-topics.sh --bootstrap-server 192.168.117.128:9093 --create --topic testKafka
看下圖
說明創建成功,現在在另外一個節點上查看
[dev@localhost kafka_2.12-2.8.0]$ bin/kafka-topics.sh --bootstrap-server 192.168.117.128:9095 --list
看下圖
可以看到剛才創建的testKafka這個topic。
有不正之處,歡迎指正。參考:
https://www.cnblogs.com/zhaoshizi/p/12154518.html
https://www.cnblogs.com/sandea/p/12078442.html