一、下載
下載地址: http://kafka.apache.org/downloads.html
kafka目錄結構
目錄
說明
bin 操作kafka的可執行腳本,還包含windows下腳本 config 配置文件所在目錄 libs 依賴庫目錄 logs 日志數據目錄,目錄kafka把server端日志分為5種類型,
分為:server,request,state,log-cleaner,controller
二、安裝及啟動
1. 解壓
> tar -xzf kafka_2.9.1-0.8.2.2.tgz > cd kafkakafka_2.9.1-0.8.2.2
2. 配置
kafka最為重要三個配置依次為:broker.id、log.dir、zookeeper.connect
kafka server端config/server.properties參數說明參照: kafka主要配置
根據屬性說明完成配置:
broker.id = 1
port = 9092
host.name=h1(h1為本人配置的hostname)
log.dirs=/root/software/kafka/log
zookeeper.connect=h1:2181
配置zookeeper(假設您已經安裝了zookeeper,如果沒有安裝,參照 zookeeper集群環境安裝配置)
3.啟動服務
在啟動kafka時需要先啟動zookeeper服務
kafka啟動命令:
> bin/kafka-server-start.sh config/server.properties
通過jps命令可以查看zookeeper和kafka是否啟動成功,如下圖:
4.創建topic (此處創建是名稱為‘my-topic’的topic,它只有一個分區,一個副本)
> bin/kafka-topics.sh --create --zookeeper h1:2181 --replication-factor 1 --partitions 1 --topic my-topic
5.查看topic列表
> bin/kafka-topics.sh --list --zookeeper h1:2181
6.發送消息
Kafka 使用一個簡單的命令行producer,從文件中或者從標准輸入中讀取消息並發送到服務端。默認的每條命令將發送一條消息。
運行producer並在控制台中輸一些消息,這些消息將被發送到服務端:> bin/kafka-console-producer.sh --broker-list h1:9092 --topic my-topic
this is one message
this is another message
如圖:
7.啟動consumer
Kafka也有一個命令行consumer可以讀取消息並輸出到標准輸出:
> bin/kafka-console-consumer.sh --zookeeper h1:2181 --topic my-topic --from-beginning
this is one message
this is another message
如圖:
三、搭建一個多個broker的集群
剛才只是啟動了單個broker,現在啟動有3個broker組成的集群,這些broker節點也都是在本機上的:
1. 首先為每個節點編寫配置文件:> cp config/server.properties config/server1.properties > cp config/server.properties config/server2.properties
編輯新文件並設置如下配置
config/server1.properties: broker.id=1 port=9093 log.dirs=log.dirs=/root/software/kafka/log1 config/server2.properties: broker.id=2 port=9094 log.dirs=/root/software/kafka/log2
broker.id在集群中唯一的標注一個節點,因為在同一個機器上,所以必須制定不同的端口和日志文件,避免數據被覆蓋。
2. 之前我們已經啟動zookeeper和一個節點,現在只需要啟動這兩個新節點
> bin/kafka-server-start.sh config/server-1.properties & > bin/kafka-server-start.sh config/server-2.properties &
3. 創建一個擁有三個副本的topic: replicated-topic
> bin/kafka-topics.sh --create --zookeeper h1:2181 --replication-factor 3 --partitions 1 --topic replicated-topic
我們可以通過運行"describe topics"命令來查看每個節點信息
> bin/kafka-topics.sh --describe --zookeeper h1:2181 --topic replicated-topic
Topic:replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: replicated-topic Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2![]()
下面解釋一下這些輸出。第一行是對所有分區的一個描述,然后每個分區都會對應一行,因為我們只有一個分區所以下面就只加了一行。
leader:負責處理消息的讀和寫,leader是從所有節點中隨機選擇的.
replicas:列出了所有的副本節點,不管節點是否在服務中.
isr:是正在服務中的節點.
在我們的例子中,節點0是作為leader運行。我們可以對比一下看一下之前創建的my-topic
4. 發送接收消息如上步驟6、7,不再重復寫出
5. 現在測試一下多broker集群的容錯能力,通過"describe topics"命令知道 replicated-topic的leader為broker0,現在我們kill掉它
> ps -ef | grep server.properties
> kill –9 20549
另外一個節點被選做了leader,node 0不再出現在 in-sync 副本列表中:
通過測試驗證雖然最初負責續寫消息的leader down掉了,但之前的消息還是可以消費的: