一、環境准備
-
操作系統:Cent OS 7
-
Kafka版本:kafka_2.10
-
Kafka官網下載:請點擊
-
JDK版本:1.8.0_171
- zookeeper-3.4.10
二、kafka安裝配置
1、下載Kafka並解壓
下載:
curl -L -O http://mirrors.cnnic.cn/apache/kafka/0.9.0.0/kafka_2.11-2.0.1.tgz
解壓:
tar zxvf kafka_2.11-2.0.1.tgz
2、Kafka目錄介紹
-
/bin 操作kafka的可執行腳本,還包含windows下腳本
-
/config 配置文件所在目錄
-
/libs 依賴庫目錄
-
/logs 日志數據目錄,目錄kafka把server端日志分為5種類型,分為:server,request,state,log-cleaner,controller
3、配置
-
配置zookeeper進入kafka安裝工程根目錄編輯config/server.properties
-
kafka最為重要三個配置依次為:broker.id、log.dir、zookeeper.connect,
-
其他kafka server端config/server.properties參數說明和解釋如下:
4、啟動Kafka
-
啟動
進入kafka目錄,敲入命令
bin/kafka-server-start.sh config/server.properties &
-
檢測2181與9092端口
netstat -tunlp|egrep "(2181|9092)" tcp 0 0 :::2181 :::* LISTEN 19787/java tcp 0 0 :::9092 :::* LISTEN 28094/java
說明:
Kafka的進程ID為28094,占用端口為9092
QuorumPeerMain為對應的zookeeper實例,進程ID為19787,在2181端口監聽
5、單機連通性測試
啟動2個XSHELL客戶端,一個用於生產者發送消息,一個用於消費者接受消息。
-
運行producer,隨機敲入幾個字符,相當於把這個敲入的字符消息發送給隊列。
bin/kafka-console-producer.sh --broker-list 192.168.1.105:9092 --topic test
說明:早版本的Kafka,–broker-list 192.168.1.181:9092需改為–zookeeper 192.168.1.181:2181
-
運行consumer,可以看到剛才發送的消息列表。
bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.105:9092 --topic test --from-beginning
說明:早版本的Kafka,--bootstrap-server 192.168.1.181:9092需改為–zookeeper 192.168.1.181:2181
注意:
producer,指定的Socket(192.168.1.181+9092),說明生產者的消息要發往kafka,也即是broker
consumer, 指定的Socket(192.168.1.181+2181),說明消費者的消息來自zookeeper(協調轉發)
上面的只是一個單個的broker,下面我們來實驗一個多broker的集群。
三、集群搭建
一、copy /kafka_2.11-2.0.1/config/server.properties文件修改以下三個地方(zk地址相同)
broker.id=1 listeners=PLAINTEXT://192.168.1.105:9092 zookeeper.connect=192.168.1.105:2181
二、啟動zk、啟動三個kafka
bin/kafka-server-start.sh ../config/server.properties &
三、集群測試
1、在broke1創建topic
bin/kafka-topics.sh --create --zookeeper 192.168.1.105:2181 --replication-factor 1 --partitions 1 --topic test4
2、broke2和broke3主機上利用命令行工具創建一個consumer程序
bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.106:9092 --topic test4 --from-beginning
bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.107:9092 --topic test4 --from-beginning
3、在broke1主機上利用命令行工具創建一個producer程序
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test4
4、broke1輸入11、broke2和broke3同時收到11
四、Kafka常用命令
(1)啟動kafka
nohup bin/kafka-server-start.sh config/server.properties > /dev/null 2>&1 &
(2)查看topic
bin/kafka-topics.sh --list --zookeeper localhost:2181
(3)控制台消費
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic middleware --from-beginning
(4)創建生產者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
(5)刪除topic
- 刪除kafka存儲目錄(server.properties文件log.dirs配置,默認為"/tmp/kafka-logs")相關topic目錄
- 如果配置了delete.topic.enable=true直接通過命令刪除,如果命令刪除不掉,直接通過zookeeper-client 刪除掉"/brokers/topics/"目錄下相關topic節點。
注意: 如果你要刪除一個topic並且重建,那么必須重新啟動kafka,否則新建的topic在zookeeper的/brokers/topics/test-topic/目錄下沒有partitions這個目錄,也就是沒有分區信息。