02_Kafka單節點實踐


1、實踐場景

 

開始前的准備條件:

1) 確認各個節點的jdk版本,將jdk升級到和kafka配套的版本(解壓既完成安裝,修改/etc/profile下的JAVA_HOME,source /etc/profile,重啟后jdk生效)

 

2、單節點kafka實踐

1) 啟動zookeeper集群

各個節點上啟動zookeeper進程
# bin/zkServer.sh start


啟動后,查看各個節點的zookeeper狀態 (leader, follower etc)
#bin/zkServer.sh status

 

2) 配置kafka的zk集群

配置文件:config/zookeeper.properties

配置要點:zookeeper中snapshot數據的存放地址,和zookeeper集群中的配置保持一致

 

3) 配置本節點上要啟動的broker

配置文件: config/server.properties
配置要點

                   broker.id

                   log.dirs (將在該路徑下自動創建partition目錄)

                   zookeeper.connect (broker要連接的zookeeper集群地址和port)

                   檢查broker發布給producer和consumer的主機名和端口號(9092)

配置后的參數如下

# broker id, 每個broker的id必須是唯一的
broker.id=0 # kafka存放消息的目錄
log.dirs=/usr/local/src/kafka_2.11/tmp/ # broker連接的zk集群,節點間通過逗號分隔,默認zk開放給客戶端連接的端口號是2181
zookeeper.connect=master:2181,slave1:2181,slave2:2181

 

注意

1) 默認Kafka會直接在ZooKeeper的根路徑下創建znode,這樣Kafka的ZooKeeper配置就會散落在根路徑下面
2) 可以指定kafka在zookeeper的某個路徑下去進行操作, 將server.properties中的zookeeper.connect修改為:
       a. zookeeper.connect=master:2181,slave1:2181,slave2:2181/kafka
       b. 同時手動在zookeeper中創建/kafka節點 (create /kafka)

4)啟動單個Broker

# ./bin/kafka-server-start.sh   ./config/server.properties

 5)查看當前kafka集群中的Topic

# ./bin/kafka-topics.sh --list --zookeeper master:2181,slave1:2181,slave2:2181
# ./bin/kafka-topics.sh --list --zookeeper master:2181,slave1:2181,slave2:2181/kafka   # 如果修改了kafka在zookeeper下的znode節點路徑,則要在--zookeeper參數中跟上chroot路徑

6) 創建topic

創建Topic時,會根據broker的個數,對replication-factor進行校驗;

# ./bin/kafka-topics.sh --create --zookeeper master:2181,slave1:2181,slave2:2181 --replication-factor 1 --partitions 1 --topic mytopic

7) 查看topic描述信息

# ./bin/kafka-topics.sh --describe --zookeeper master:2181,slave1:2181,slave2:2181 --topic mytopic

 

partition:0        該partition的編號,從0開始
leader:0          該partition的leader節點的broker.id = 0
replicas:0       表示partition落地的所有broker, 包括leader在內
isr:0                當前處於in-sync的replicas節點, 包括leader在內, broker.id = 0

8) zookeeper上將記錄kafka已經創建的topic

9)broker的log.dirs中將創建partition目錄

目錄名topic名-partition編號       //編號從0開始

 

mytopic-0   

 

10)Partition目錄下的segment文件

 

每一個segment,都會由 log文件,index文件 組成

 

解析:

xxxx.log           -- 最初的log文件,文件名是初始偏移(base offset),   消息存儲在log文件
xxxx.index       -- 最初的index文件,文件名是初始偏移(base offset)index則用於對應log文件內的消息檢索
                             offset, postion
                             offset, 消息計數; postion,消息頭在log文件中的起始位置(position從0開始)
                             index文件可以映射到內存,從index文件的名字可以判斷出某一個消息應該位於哪一個log文件

 

segment文件的默認最大size為1G,超過該size后才會創建新的segment文件,新的segment文件將稱為active segment, 數據是向active segment寫入

 

11) 調整Topic的partition個數

只能增加,不能減少

# ./bin/kafka-topics.sh --alter --zookeeper master:2181,slave1:2181,slave2:2181 --topic mytopic --partitions 2

 

 物理的partition目錄,也變為了2個

 

12)使用kafka提供的Producer客戶端,模擬消息發送

# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mytopic

--broker-list:      該producer要向哪些broker推送消息, ip:port形式說明
--topic:              該producer要向哪一個Topic發送消息

 

啟動producer客戶端后,每行都會作為一條message, 存儲到該topic的partition文件下的log文件中

 

13) 查看log文件中,消息落地到了哪里

1) 由於消息中沒有指定key,  kafka將針對每條message,隨機找一個partition進行存放

2)4條消息,存儲到了兩個分區mytopic-0和mytopic-1下

mytopic-0分區,log文件存儲了2條消息

 

mytopic-1分區,log文件存儲了2條消息

 

 14)啟動Consumer, 模擬消息的消費

1、Consumer消費消息時,只需要指定topic+offset
2、通過zookeeper獲取topic的partition位於哪些broker上,各自的leader broker是誰,然后和leader broker連接,獲取數據

 ./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic mytopic --from-beginning  

 

 3、觀察多個partition下,consumer端數據的順序

topic有2個分區,consumer端得到的數據順序和發送端的不一定相同,但每個partiton內部的消息順序能夠保證(非全局有序,partition內部有序)

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2026 CODEPRJ.COM