【1】前提條件
(1)部署集群至少要3台機器,奇數節點
(2)kafka 的安裝需要 java 環境,jdk 1.8以上
(3)本次實驗安裝包:kafka_2.13-2.8.0.tgz
(4)假設3台服務器分別為:kafka1、kafka2、kafka3
服務器名稱 |
IP |
域名 |
kafka1 |
192.168.175.132 |
kafka1.sd.cn |
kafka2 |
192.168.175.132 |
kafka2.sd.cn |
kafka3 |
192.168.175.132 |
kafka3.sd.cn |
這里我已經有jdk了,就不裝了;
【2】ZK集群搭建
(2.2以上好像就可以無 zk 集群搭建了)
(2.1)編輯zk配置文件
tar -zxf kafka_2.13-2.8.0.tgz
cd kafka_2.13-2.8.0
mkdir zk_kfk_data
cd ../
mv kafka_2.13-2.8.0 /data/kafka
cd /data/kafka
cd config
vim zookeeper.properties
---------------------------------------------------------
dataDir=/data/kafka/zk_kfk_data
#Zookeeper保存日志文件的目錄
#dataLogDir=/data/zookeeper/logs
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
tickTime=2000
initLimit=10
syncLimit=5
# Disable the adminserver by default to avoid port conflicts.
# Set the port to something non-conflicting if choosing to enable this
admin.enableServer=false
# admin.serverPort=8080
server.1=192.168.175.132:2888:3888
server.2=192.168.175.148:2888:3888
server.3=192.168.175.147:2888:3888
--------------------------------------------
三台機器上的zookeeper.properties文件配置相同,data.Dir 為zk的數據目錄,server.1、server.2、server.3 為集群信息。
2888端口號是zookeeper服務之間通信的端口
3888端口是zookeeper與其他應用程序通信的端口。
tickTime:CS通信心跳數
Zookeeper 服務器之間或客戶端與服務器之間維持心跳的時間間隔,也就是每個 tickTime 時間就會發送一個心跳。
tickTime以毫秒為單位。
tickTime:該參數用來定義心跳的間隔時間,zookeeper的客戶端和服務端之間也有和web開發里類似的session的概念,而zookeeper里最小的session過期時間就是tickTime的兩倍。
initLimit:LF初始通信時限
集群中的follower服務器(F)與leader服務器(L)之間 初始連接 時能容忍的最多心跳數(tickTime的數量)
syncLimit:LF同步通信時限
集群中的follower服務器(F)與leader服務器(L)之間 請求和應答 之間能容忍的最多心跳數(tickTime的數量)
(2.2)構建myid文件
創建myid文件:進入/data/kafka/zk_kfk_data目錄,創建myid文件,將三台服務器上的myid文件分別寫入1,2,3。
myid是zookeeper集群用來發現彼此的標識,必須創建,且不能相同。
--------------
echo "1">/data/kafka/zk_kfk_data/myid
echo "2">/data/kafka/zk_kfk_data/myid
echo "3">/data/kafka/zk_kfk_data/myid
---------------------
(2.3)構建環境變量
cd /data/kafka/bin/
echo "export PATH=${PATH}:`pwd`" >>/etc/profile
source /etc/profile
(2.4)啟動ZK集群
nohup zookeeper-server-start.sh /data/kafka/config/zookeeper.properties >>/data/kafka/zookeeper.log &
less /data/kafka/zookeeper.log
看看錯誤日志,沒有問題就ok
然后,防火牆開放
2181、2888、3888端口
(2.5)登錄驗證
zookeeper-shell.sh 192.168.175.132:2182
如上圖,成功
【3】kafka集群
(3.1)修改配置文件 server.properties
(1)進入目錄:cd /data/kafka
(2)創建 kafka 日志數據目錄:mkdir kafka-logs
(3)進入配置目錄: cd /data/kafka/config
(4)修改 server.properties 配置文件
----------------------------------
broker.id=0
advertised.listeners=PLAINTEXT://:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka/kafka-logs-1
num.partitions=5
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=24
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.175.132:2181,192.168.175.148:2181,192.168.175.147:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
------------------------
broker.id:三個節點要配置不同的值,分別配置為0,1,2
advertised.listeners:對外監聽端口
log.dirs:日志目錄
num.partitions:默認分區數量
log.retention.hours:日志保留時間
zookeeper.connect:zookeeper連接地址,多個以逗號隔開
(3.2)啟動 kafka 集群
nohup kafka-server-start.sh /data/kafka/config/server.properties >>/data/kafka/kafka.log &
less /data/kafka/kafka.log
【4】測試 kafka 集群
(4.1)創建 topic : test
kafka-topics.sh --help
kafka-topics.sh --create --zookeeper 192.168.175.132:2181,192.168.175.148:2181,192.168.175.147:2181 --partitions 3 --replication-factor 1 --topic test
--replication-factor 2,才會有1個副本,否則就只有分區本身,也就是說會有副本因子-1個副本;如果設置成3就是 1主2從
結果:
Created topic test.
(4.2)查看 topic
kafka-topics.sh --list --zookeeper 192.168.175.147:2181
結果:test
kafka-topics.sh --describe --zookeeper 192.168.175.147:2181
(4.3)模擬客戶端發布訂閱
發布:kafka-console-producer.sh --bootstrap-server 192.168.175.132:9092,192.168.175.148:9092,192.168.175.147:9092 --topic test
訂閱:kafka-console-consumer.sh --bootstrap-server 192.168.175.132:9092,192.168.175.148:9092,192.168.175.147:9092 --from-beginning --topic test
【5】副本擴縮、分區遷移、跨路徑遷移 kafka-reassign-partitions
詳細參考:https://mp.weixin.qq.com/s/fQ03wpctV1dGnmk1r-xEWA
腳本參數
參數 | 描述 | 例子 | |
---|---|---|---|
--zookeeper(高版本已啟用) |
連接zk | --zookeeper localhost:2181, localhost:2182 |
|
--bootstrap-server(高版本用) |
連接kafka | --bootstrap-server localhost:9092,x.x.x.x:9092 |
|
--topics-to-move-json-file |
指定json文件,文件內容為topic配置 |
|
|
--generate |
嘗試給出副本重分配的策略,該命令並不實際執行 | ||
--broker-list |
指定具體的BrokerList,用於嘗試給出分配策略,與--generate 搭配使用 |
--broker-list 0,1,2,3 |
|
--reassignment-json-file |
指定要重分配的json文件,與--execute 搭配使用 |
json文件格式如下例如: |
|
--execute |
開始執行重分配任務,與--reassignment-json-file 搭配使用 |
||
--verify |
驗證任務是否執行成功,當有使用--throttle 限流的話,該命令還會移除限流;該命令很重要,不移除限流對正常的副本之間同步會有影響 |
||
--throttle |
遷移過程Broker之間現在流程傳輸的速率,單位 bytes/sec | -- throttle 500000 |
|
--replica-alter-log-dirs-throttle |
broker內部副本跨路徑遷移數據流量限制功能,限制數據拷貝從一個目錄到另外一個目錄帶寬上限 單位 bytes/sec | --replica-alter-log-dirs-throttle 100000 |
|
--disable-rack-aware |
關閉機架感知能力,在分配的時候就不參考機架的信息 | ||
--bootstrap-server |
如果是副本跨路徑遷移必須有此參數 |
(5.1)腳本的使用介紹
關鍵參數--generate
1=》構造文件
cd /data/kafka
vim move.json
{
"topics": [ {"topic": "test"} ], "version": 1 }
運行 generate 參數生成 當前副本的配置介紹 json,以及建議修改的 json
可以通過 下面來獲取 broker-list 值
[root@DB6 /data/kafka]$ kafka-configs.sh --describe --bootstrap-server localhost:9092 --entity-type brokers Dynamic configs for broker 0 are: Dynamic configs for broker 1 are: Dynamic configs for broker 2 are: Default configs for brokers in the cluster are:
如上圖查詢,我們發現該集群的 broker 列表是 0,1,2,那我們
運行 generate 參數生成 當前副本的配置介紹 json,以及建議修改的 json
kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --topics-to-move-json-file ./move.json --broker-list "0,1,2" --generate
Current partition replica assignment
{"version":1,"partitions":[{"topic":"test","partition":0,"replicas":[1,0,2],"log_dirs":["any","any","any"]},{"topic":"test","partition":1,"replicas":[0,2,1],"log_dirs":["any","any","any"]},{"topic":"test","partition":2,"replicas":[2,1,0],"log_dirs":["any","any","any"]}]} Proposed partition reassignment configuration {"version":1,"partitions":[{"topic":"test","partition":0,"replicas":[1,0,2],"log_dirs":["any","any","any"]},{"topic":"test","partition":1,"replicas":[2,1,0],"log_dirs":["any","any","any"]},{"topic":"test","partition":2,"replicas":[0,2,1],"log_dirs":["any","any","any"]}]}
我找json 在線格式化看看
對比一下發現,partition 1 和 2 的replicas 不一樣了;
(5.2)執行 json文件
kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file ./reassignment.json --execute
[root@DB6 /data/kafka]$ kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file ./reassignment.json --execute Warning: --zookeeper is deprecated, and will be removed in a future version of Kafka. Current partition replica assignment {"version":1,"partitions":[{"topic":"test","partition":0,"replicas":[1,0,2],"log_dirs":["any","any","any"]},{"topic":"test","partition":1,"replicas":[0,2,1],"log_dirs":["any","any","any"]},{"topic":"test","partition":2,"replicas":[2,1,0],"log_dirs":["any","any","any"]}]} Save this to use as the --reassignment-json-file option during rollback Successfully started partition reassignments for test-0,test-1,test-2
對比結果:上面是操作前,下面是操作后,很明顯 replicas位置表了一下,但 isr 並沒有改變
遷移過程注意流量陡增對集群的影響 Kafka提供一個broker之間復制傳輸的流量限制,限制了副本從機器到另一台機器的帶寬上限;
當重新平衡集群,引導新broker,添加或移除broker時候,這是很有用的。
因為它限制了這些密集型的數據操作從而保障了對用戶的影響、 例如我們上面的遷移操作加一個限流選項-- throttle 50000000
kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file ./reassignment.json --execute --throttle 50000000
在后面加上一個—throttle 50000000
參數, 那么執行移動分區的時候,會被限制流量在50000000 B/s
加上參數后你可以看到
The throttle limit was set to 50000000 B/s Successfully started reassignment of partitions.
需要注意的是
如果你遷移的時候包含 副本跨路徑遷移(同一個Broker多個路徑)那么這個限流措施不會生效;
你需要再加上|--replica-alter-log-dirs-throttle
這個限流參數,它限制的是同一個Broker不同路徑直接遷移的限流;
(5.3)驗證
關鍵參數--verify
該選項用於檢查分區重新分配的狀態,同時—throttle
流量限制也會被移除掉; 否則可能會導致定期復制操作的流量也受到限制。
kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file ./reassignment.json --verify
如果 topic 不存在則報錯:
【6】副本縮容實踐
當副本分配少於之前的數量時候,多出來的副本會被刪除; 比如剛剛我現在是3個副本,想恢復到2個副本,刪掉 broker id 為2 的節點上的副本
(6.1)利用(5.1)generate 獲取當前配置json
kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --topics-to-move-json-file ./move.json --broker-list "0,1,2" --generate Current partition replica assignment {"version":1,"partitions":[{"topic":"test","partition":0,"replicas":[1,0,2],"log_dirs":["any","any","any"]},{"topic":"test","partition":1,"replicas":[2,1,0],"log_dirs":["any","any","any"]},{"topic":"test","partition":2,"replicas":[0,2,1],"log_dirs":["any","any","any"]}]} Proposed partition reassignment configuration {"version":1,"partitions":[{"topic":"test","partition":0,"replicas":[1,0,2],"log_dirs":["any","any","any"]},{"topic":"test","partition":1,"replicas":[2,1,0],"log_dirs":["any","any","any"]},{"topic":"test","partition":2,"replicas":[0,2,1],"log_dirs":["any","any","any"]}]}
我們轉化一下之后, 構造 reassignment .json
把 replicas 中的 2 全部干掉;log_dirs 也只有2個
{ "version": 1, "partitions": [{ "topic": "test", "partition": 0, "replicas": [1, 0], "log_dirs": ["any", "any"] }, { "topic": "test", "partition": 1, "replicas": [1, 0], "log_dirs": ["any", "any"] }, { "topic": "test", "partition": 2, "replicas": [0, 1], "log_dirs": ["any", "any"] }] }
(6.2)執行縮容
[root@DB6 /data/kafka]$ kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file ./reassignment.json --execute --throttle 50000000 Current partition replica assignment {"version":1,"partitions":[{"topic":"test","partition":0,"replicas":[1,0,2],"log_dirs":["any","any","any"]},{"topic":"test","partition":1,"replicas":[2,1,0],"log_dirs":["any","any","any"]},{"topic":"test","partition":2,"replicas":[0,2,1],"log_dirs":["any","any","any"]}]} Save this to use as the --reassignment-json-file option during rollback Warning: You must run --verify periodically, until the reassignment completes, to ensure the throttle is removed. The inter-broker throttle limit was set to 50000000 B/s Successfully started partition reassignments for test-0,test-1,test-2 -- 說建議我們運行 --verify 查看確保已經移除 [root@DB6 /data/kafka]$ kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file ./reassignment.json --verify Status of partition reassignment: Reassignment of partition test-0 is complete. Reassignment of partition test-1 is complete. Reassignment of partition test-2 is complete. Clearing broker-level throttles on brokers 0,1,2 Clearing topic-level throttles on topic test
--縮容之后發現好像沒什么問題
操作前后 topic 情況:
成功;
同時 我去 broker 為 2的 節點上看,文件夾確實沒有了,這里就不貼圖了;
【7】副本擴容實踐
加一個分區副本
(7.1)利用(5.1)generate 獲取當前配置json
[root@DB6 /data/kafka]$ kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --topics-to-move-json-file ./move.json --broker-list "0,1,2" --generate Current partition replica assignment {"version":1,"partitions":[{"topic":"test","partition":0,"replicas":[1,0],"log_dirs":["any","any"]},{"topic":"test","partition":1,"replicas":[1,0],"log_dirs":["any","any"]},{"topic":"test","partition":2,"replicas":[0,1],"log_dirs":["any","any"]}]} Proposed partition reassignment configuration {"version":1,"partitions":[{"topic":"test","partition":0,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"test","partition":1,"replicas":[2,0],"log_dirs":["any","any"]},{"topic":"test","partition":2,"replicas":[0,1],"log_dirs":["any","any"]}]} [root@DB6 /data/kafka]$
我們轉化一下之后, 構造 reassignment .json
{ "version": 1, "partitions": [{ "topic": "test", "partition": 0, "replicas": [2, 1, 0], "log_dirs": ["any", "any", "any"] }, { "topic": "test", "partition": 1, "replicas": [1, 0, 2], "log_dirs": ["any", "any", "any"] }, { "topic": "test", "partition": 2, "replicas": [0, 1, 2], "log_dirs": ["any", "any", "any"] }] }
(7.2)開始擴容
[root@DB6 /data/kafka]$ kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file ./reassignment.json --execute Current partition replica assignment {"version":1,"partitions":[{"topic":"test","partition":0,"replicas":[1,0],"log_dirs":["any","any"]},{"topic":"test","partition":1,"replicas":[1,0],"log_dirs":["any","any"]},{"topic":"test","partition":2,"replicas":[0,1],"log_dirs":["any","any"]}]} Save this to use as the --reassignment-json-file option during rollback Successfully started partition reassignments for test-0,test-1,test-2 [root@DB6 /data/kafka]$ kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file ./reassignment.json --verify Status of partition reassignment: Reassignment of partition test-0 is complete. Reassignment of partition test-1 is complete. Reassignment of partition test-2 is complete. Clearing broker-level throttles on brokers 0,1,2 Clearing topic-level throttles on topic test
結果對比:
但我們發現 leader 有問題;
重新選舉一下:
[root@DB6 /data/kafka]$ kafka-leader-election.sh --bootstrap-server localhost:9092 --election-type preferred --all-topic-partitions Successfully completed leader election (PREFERRED) for partitions test-1, test-0
leader 變成均勻分布了,完成
【8】 副本跨路徑遷移
為什么線上Kafka機器各個磁盤間的占用不均勻,經常出現“一邊倒”的情形?這是因為Kafka只保證分區數量在各個磁盤上均勻分布,但它無法知曉每個分區實際占用空間,故很有可能出現某些分區消息數量巨大導致占用大量磁盤空間的情況。在1.1版本之前,用戶對此毫無辦法,因為1.1之前Kafka只支持分區數據在不同broker間的重分配,而無法做到在同一個broker下的不同磁盤間做重分配。1.1版本正式支持副本在不同路徑間的遷移
怎么在一台Broker上用多個路徑存放分區呢?
只需要在配置上接多個文件夾就行了
############################# Log Basics #############################
# A comma separated list of directories under which to store log files
log.dirs=kafka-logs-5,kafka-logs-6,kafka-logs-7,kafka-logs-8
注意同一個Broker上不同路徑只會存放不同的分區,而不會將副本存放在同一個Broker; 不然那副本就沒有意義了(容災)
怎么針對跨路徑遷移呢?
遷移的json文件有一個參數是log_dirs
; 默認請求不傳的話 它是"log_dirs": ["any"]
(這個數組的數量要跟副本保持一致) 但是你想實現跨路徑遷移,只需要在這里填入絕對路徑就行了,例如下面
遷移的json文件示例
{
"version": 1,
"partitions": [{
"topic": "test_create_topic4",
"partition": 2,
"replicas": [0],
"log_dirs": ["/Users/xxxxx/work/IdeaPj/source/kafka/kafka-logs-5"]
}, {
"topic": "test_create_topic4",
"partition": 1,
"replicas": [0],
"log_dirs": ["/Users/xxxxx/work/IdeaPj/source/kafka/kafka-logs-6"]
}]
}
然后執行腳本
sh bin/kafka-reassign-partitions.sh --zookeeper xxxxx --reassignment-json-file config/reassignment-json-file.json --execute --bootstrap-server
xxxxx:9092 --replica-alter-log-dirs-throttle 10000
注意 --bootstrap-server
在跨路徑遷移的情況下,必須傳入此參數
如果需要限流的話 加上參數|--replica-alter-log-dirs-throttle
; 跟--throttle
不一樣的是 --replica-alter-log-dirs-throttle
限制的是Broker內不同路徑的遷移流量;
【9】分區擴容縮容 topic
(9.1)分區縮容=》不允許
[root@DB6 /data/kafka]$ kafka-topics.sh --alter --bootstrap-server localhost:9092 --topic test --partitions 2 Error while executing topic command : Topic currently has 3 partitions, which is higher than the requested 2. [2021-09-03 18:03:17,386] ERROR org.apache.kafka.common.errors.InvalidPartitionsException: Topic currently has 3 partitions, which is higher than the requested 2. (kafka.admin.TopicCommand$) [root@DB6 /data/kafka]$
那么我們知道直接這樣是不行的,可以通過【6】中的辦法去縮容;
(9.2)分區擴容=》允許
(我的實踐)
Topic: test2 TopicId: yF8LT56YSXyB3U0PLzmUBQ PartitionCount: 2 ReplicationFactor: 1 Configs: segment.bytes=1073741824 Topic: test2 Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: test2 Partition: 1 Leader: 2 Replicas: 2 Isr: 2
添加分區:[root@DB6 /data/kafka]$ kafka-topics.sh --alter --bootstrap-server localhost:9092 --topic test2 --partitions 3
Topic: test2 TopicId: yF8LT56YSXyB3U0PLzmUBQ PartitionCount: 3 ReplicationFactor: 1 Configs: segment.bytes=1073741824 Topic: test2 Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: test2 Partition: 1 Leader: 2 Replicas: 2 Isr: 2 Topic: test2 Partition: 2 Leader: 2 Replicas: 2 Isr: 2
添加分區,指定到固定節點:
[root@DB6 /data/kafka]$ kafka-topics.sh --alter --bootstrap-server localhost:9092 --topic test2 --partitions 6 --replica-assignment 0,1,2,0,2,2 [root@DB6 /data/kafka]$ kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic "test2" Topic: test2 TopicId: yF8LT56YSXyB3U0PLzmUBQ PartitionCount: 6 ReplicationFactor: 1 Configs: segment.bytes=1073741824 Topic: test2 Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: test2 Partition: 1 Leader: 2 Replicas: 2 Isr: 2 Topic: test2 Partition: 2 Leader: 2 Replicas: 2 Isr: 2 Topic: test2 Partition: 3 Leader: 0 Replicas: 0 Isr: 0 Topic: test2 Partition: 4 Leader: 2 Replicas: 2 Isr: 2 Topic: test2 Partition: 5 Leader: 2 Replicas: 2 Isr: 2
(9.3)分區遷移
分區遷移,參考【5/6/7】
參考:
zk方式(不推薦)
>bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic1 --partitions 2
kafka版本 >= 2.2 支持下面方式(推薦)
單個Topic擴容
bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic test_create_topic1 --partitions 4
批量擴容 (將所有正則表達式匹配到的Topic分區擴容到4個)
sh bin/kafka-topics.sh --topic ".*?" --bootstrap-server 172.23.248.85:9092 --alter --partitions 4
".*?"
正則表達式的意思是匹配所有; 您可按需匹配
PS: 當某個Topic的分區少於指定的分區數時候,他會拋出異常;但是不會影響其他Topic正常進行;
相關可選參數 | 參數 |描述 |例子| |--|--|--| |--
replica-assignment
|副本分區分配方式;創建topic的時候可以自己指定副本分配情況;
|--replica-assignment
BrokerId-0:BrokerId-1:BrokerId-2,BrokerId-1:BrokerId-2:BrokerId-0,BrokerId-2:BrokerId-1:BrokerId-0 ;
這個意思是有三個分區和三個副本,對應分配的Broker; 逗號隔開標識分區;冒號隔開表示副本|
PS: 雖然這里配置的是全部的分區副本分配配置,但是正在生效的是新增的分區; 比如: 以前3分區1副本是這樣的
Broker-1 | Broker-2 | Broker-3 | Broker-4 |
---|---|---|---|
0 | 1 | 2 |
現在新增一個分區,--replica-assignment
2,1,3,4 ; 看這個意思好像是把0,1號分區互相換個Broker
Broker-1 | Broker-2 | Broker-3 | Broker-4 |
---|---|---|---|
1 | 0 | 2 | 3 |
但是實際上不會這樣做,Controller在處理的時候會把前面3個截掉; 只取新增的分區分配方式,原來的還是不會變
Broker-1 | Broker-2 | Broker-3 | Broker-4 |
---|---|---|---|
0 | 1 | 2 | 3 |
【故障處理】
(1)serverid null is not a number
在 zk 配置文件中指定的data目錄下,有個 myid 文件,里面值不能為空 也不能和其他 zk 重復;
修改一下即可;