1、實踐場景

開始前的准備條件:
1) 確認各個節點的jdk版本,將jdk升級到和kafka配套的版本(解壓既完成安裝,修改/etc/profile下的JAVA_HOME,source /etc/profile,重啟后jdk生效)
2、單節點kafka實踐
1) 啟動zookeeper集群
各個節點上啟動zookeeper進程
# bin/zkServer.sh start
啟動后,查看各個節點的zookeeper狀態 (leader, follower etc)
#bin/zkServer.sh status
2) 配置kafka的zk集群
配置文件:config/zookeeper.properties
配置要點:zookeeper中snapshot數據的存放地址,和zookeeper集群中的配置保持一致

3) 配置本節點上要啟動的broker
配置文件: config/server.properties
配置要點:
broker.id
log.dirs (將在該路徑下自動創建partition目錄)
zookeeper.connect (broker要連接的zookeeper集群地址和port)
檢查broker發布給producer和consumer的主機名和端口號(9092)
配置后的參數如下:
# broker id, 每個broker的id必須是唯一的 broker.id=0 # kafka存放消息的目錄 log.dirs=/usr/local/src/kafka_2.11/tmp/ # broker連接的zk集群,節點間通過逗號分隔,默認zk開放給客戶端連接的端口號是2181 zookeeper.connect=master:2181,slave1:2181,slave2:2181

注意:
1) 默認Kafka會直接在ZooKeeper的根路徑下創建znode,這樣Kafka的ZooKeeper配置就會散落在根路徑下面
2) 可以指定kafka在zookeeper的某個路徑下去進行操作, 將server.properties中的zookeeper.connect修改為:
a. zookeeper.connect=master:2181,slave1:2181,slave2:2181/kafka
b. 同時手動在zookeeper中創建/kafka節點 (create /kafka)
4)啟動單個Broker
# ./bin/kafka-server-start.sh ./config/server.properties

5)查看當前kafka集群中的Topic
# ./bin/kafka-topics.sh --list --zookeeper master:2181,slave1:2181,slave2:2181 # ./bin/kafka-topics.sh --list --zookeeper master:2181,slave1:2181,slave2:2181/kafka # 如果修改了kafka在zookeeper下的znode節點路徑,則要在--zookeeper參數中跟上chroot路徑
6) 創建topic
創建Topic時,會根據broker的個數,對replication-factor進行校驗;
# ./bin/kafka-topics.sh --create --zookeeper master:2181,slave1:2181,slave2:2181 --replication-factor 1 --partitions 1 --topic mytopic
7) 查看topic描述信息
# ./bin/kafka-topics.sh --describe --zookeeper master:2181,slave1:2181,slave2:2181 --topic mytopic

partition:0 該partition的編號,從0開始
leader:0 該partition的leader節點的broker.id = 0
replicas:0 表示partition落地的所有broker, 包括leader在內
isr:0 當前處於in-sync的replicas節點, 包括leader在內, broker.id = 0
8) zookeeper上將記錄kafka已經創建的topic

9)broker的log.dirs中將創建partition目錄
目錄名:topic名-partition編號 //編號從0開始
mytopic-0

10)Partition目錄下的segment文件

每一個segment,都會由 log文件,index文件 組成

解析:
xxxx.log -- 最初的log文件,文件名是初始偏移(base offset), 消息存儲在log文件
xxxx.index -- 最初的index文件,文件名是初始偏移(base offset),index則用於對應log文件內的消息檢索
offset, postion
offset, 消息計數; postion,消息頭在log文件中的起始位置(position從0開始)
index文件可以映射到內存,從index文件的名字可以判斷出某一個消息應該位於哪一個log文件

segment文件的默認最大size為1G,超過該size后才會創建新的segment文件,新的segment文件將稱為active segment, 數據是向active segment寫入
11) 調整Topic的partition個數
只能增加,不能減少
# ./bin/kafka-topics.sh --alter --zookeeper master:2181,slave1:2181,slave2:2181 --topic mytopic --partitions 2

物理的partition目錄,也變為了2個

12)使用kafka提供的Producer客戶端,模擬消息發送
# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mytopic
--broker-list: 該producer要向哪些broker推送消息, ip:port形式說明
--topic: 該producer要向哪一個Topic發送消息
啟動producer客戶端后,每行都會作為一條message, 存儲到該topic的partition文件下的log文件中

13) 查看log文件中,消息落地到了哪里
1) 由於消息中沒有指定key, kafka將針對每條message,隨機找一個partition進行存放
2)4條消息,存儲到了兩個分區mytopic-0和mytopic-1下
mytopic-0分區,log文件存儲了2條消息

mytopic-1分區,log文件存儲了2條消息

14)啟動Consumer, 模擬消息的消費
1、Consumer消費消息時,只需要指定topic+offset
2、通過zookeeper獲取topic的partition位於哪些broker上,各自的leader broker是誰,然后和leader broker連接,獲取數據
./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic mytopic --from-beginning
3、觀察多個partition下,consumer端數據的順序
topic有2個分區,consumer端得到的數據順序和發送端的不一定相同,但每個partiton內部的消息順序能夠保證(非全局有序,partition內部有序)

