系列導航
五、kafka集群__consumer_offsets副本數修改
kafka集群搭建
本章講解如何安裝一個由三台機器組成的kafka集群,搭建完成該集群就可以在生產環境上使用了
三台服務器的ip 192.168.0.104,192.168.0.105,192.168.0.106
需要的軟件包:
1、jdk1.8安裝包 jdk-8u211-linux-x64.tar
2、zookeeper的安裝包 zookeeper-3.4.14.tar
3、kafka的安裝包 kafka_2.11-2.1.1.tgz
(一)zookeeper 搭建(三台機器都要操作)
1、軟件環境
192.168.0.104 server1
192.168.0.105 server2
192.168.0.106 server3
相關的安裝包拷入/opt/kafka下
cd /opt
mkdir kafka
2、安裝jdk1.8
(1)解壓安裝包
tar -xvf jdk-8u211-linux-x64.tar
(2)移動到安裝目錄
mv jdk1.8.0_211 /usr/local
(3)設置環境變量
vi /etc/profile
export JAVA_HOME=/usr/local/jdk1.8.0_211
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export PATH=$JAVA_HOME/bin:$PATH
source /etc/profile //讓配置生效
(4)測試是否安裝成功
cd /
echo $JAVA_HOME
echo $PATH
echo $CLASSPATH
java -version 查看版本
3、創建目錄
mkdir /opt/zookeeper #項目目錄
mkdir /opt/zookeeper/zkdata #存放快照日志
mkdir /opt/zookeeper/zkdatalog #存放事物日志
將zookeeper-3.4.14.tar放到/opt/zookeeper/目錄下
cp zookeeper-3.4.14.tar /opt/zookeeper/
tar -xvf zookeeper-3.4.14.tar
4、修改配置信息
cd /opt/zookeeper/zookeeper-3.4.14/conf
cp zoo_sample.cfg zoo.cfg
vi zoo.cfg
#添加如下內容
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/opt/zookeeper/zkdata
dataLogDir=/opt/zookeeper/zkdatalog
clientPort=12181
server.1=192.168.0.104:12888:13888
server.2=192.168.0.105:12888:13888
server.3=192.168.0.106:12888:13888
5、創建myid文件
#server1
echo "1" > /opt/zookeeper/zkdata/myid
#server2
echo "2" > /opt/zookeeper/zkdata/myid
#server3
echo "3" > /opt/zookeeper/zkdata/myid
6、主要的shell
/opt/zookeeper/zookeeper-3.4.14/bin
zkServer.sh 主的管理程序文件
zkEnv.sh 是主要配置,zookeeper集群啟動時配置環境變量的文件
7、啟動服務並查看
cd /opt/zookeeper/zookeeper-3.4.14/bin
#啟動服務(3台都需要操作)
./zkServer.sh start
#檢查服務器狀態
./zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper/zookeeper-3.4.14/bin/../conf/zoo.cfg
Mode: follower #他是否為領導
可以用“jps”查看zk的進程,這個是zk的整個工程的main
#執行命令jps
[root@zhu bin]# jps
27813 QuorumPeerMain
27909 Jps
8、安裝scala 2.11
tar -xvf scala-2.11.12.tar
vi /etc/profile 添加如下內容:
export SCALA_HOME=/opt/kafka/scala-2.11.12
export PATH=$SCALA_HOME/bin:$PATH
source /etc/profile //讓配置生效
-------------------------------kafka集群安裝-------------------------------------------------------------
kafka的日志位置:/opt/kafka/kafka_2.11-2.1.1/logs
(二)kafka集群 搭建(三台機器)
1、創建目錄
mkdir -p /opt/kafka/kafkalogs
2、解壓安裝包
cd /opt/kafka
tar -zxvf kafka_2.11-2.1.1.tgz
3、修改配置文件
cd /opt/kafka/kafka_2.11-2.1.1/config
vi server.properties #根據正式庫的配置修改該文件
#添加如下內容 注意不同機器的ip不同
#192.168.0.104主機
broker.id=104
listeners=PLAINTEXT://192.168.0.104:9092
port=9092
advertised.listeners=PLAINTEXT://192.168.0.104:9092
advertised.port=9092
host.name=192.168.0.104
advertised.host.name=192.168.0.104
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/kafka/kafkalogs/
num.partitions=2
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.0.104:12181,192.168.0.105:12181,192.168.0.106:12181
zookeeper.connection.timeout.ms=60000
group.initial.rebalance.delay.ms=0
#inter.broker.protocol.version=0.9.0
#log.message.format.version=0.9.0
unclean.leader.election.enable=true
auto.create.topics.enable=true
default.replication.factor=3
#192.168.0.105主機
broker.id=105
listeners=PLAINTEXT://192.168.0.105:9092
port=9092
advertised.listeners=PLAINTEXT://192.168.0.105:9092
advertised.port=9092
host.name=192.168.0.105
advertised.host.name=192.168.0.105
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/kafka/kafkalogs/
num.partitions=2
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.0.104:12181,192.168.0.105:12181,192.168.0.106:12181
zookeeper.connection.timeout.ms=60000
group.initial.rebalance.delay.ms=0
#inter.broker.protocol.version=0.9.0
#log.message.format.version=0.9.0
unclean.leader.election.enable=true
auto.create.topics.enable=true
default.replication.factor=3
#192.168.0.106主機
broker.id=106
listeners=PLAINTEXT://192.168.0.106:9092
port=9092
advertised.listeners=PLAINTEXT://192.168.0.106:9092
advertised.port=9092
host.name=192.168.0.106
advertised.host.name=192.168.0.106
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/kafka/kafkalogs/
num.partitions=2
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.0.104:12181,192.168.0.105:12181,192.168.0.106:12181
zookeeper.connection.timeout.ms=60000
group.initial.rebalance.delay.ms=0
#inter.broker.protocol.version=0.9.0
#log.message.format.version=0.9.0
unclean.leader.election.enable=true
auto.create.topics.enable=true
default.replication.factor=3
4、啟動服務並測試
設置環境變量
vi /etc/profile 添加如下內容:
export KAFKA_HOME=/opt/kafka/kafka_2.11-2.1.1
export PATH=$KAFKA_HOME/bin:$PATH
source /etc/profile //讓配置生效
#啟動
[root@zhu bin]# kafka-server-start.sh -daemon ../config/server.properties
[root@zhu bin]# jps
27813 QuorumPeerMain
28847 Kafka
28882 Jps
#關閉
[root@zhu bin]# jps
27813 QuorumPeerMain
28847 Kafka
-------------------------------常用操作-----------------------------------------------
5、 創建Topic來驗證是否創建成功
#創建Topic
kafka-topics.sh --create --zookeeper 192.168.0.104:12181 --replication-factor 2 --partitions 2 --topic yc
#解釋
--replication-factor 2 #復制兩份
--partitions 1 #創建1個分區
--topic #主題為yc
刪除 topic
kafka-topics.sh --delete --zookeeper 192.168.0.104:12181 --topic yc
6、相關命令
(1)查看topic
kafka-topics.sh --list --zookeeper localhost:12181或者 kafka-topics.sh --list --zookeeper 192.168.0.104:12181
#就會顯示我們創建的所有topic
(2)查看topic狀態
[root@zhu bin]# kafka-topics.sh --describe --zookeeper localhost:12181 --topic yc
Topic:yc PartitionCount:2 ReplicationFactor:2 Configs:
Topic: yc Partition: 0 Leader: 104 Replicas: 104,105 Isr: 104,105
Topic: yc Partition: 1 Leader: 105 Replicas: 105,106 Isr: 105,106
#分區為為2 復制因子為2 他的 yc的分區為0和1
#說明
partiton: partion id
leader:當前負責讀寫的lead broker id
replicas:當前partition的所有replication broker list
isr:relicas的子集,只包含出於活動狀態的broker
(3)重新分配partition
bin/kafka-reassign-partitions.sh
--zookeeper <urls> 指定zookeeper的連接地址,格式host:port
--broker-list<brokerlist> 指定partition需要重新分配到哪些節點,格式為”0,1,2”
--topics-to-move-json-file <topics to reassign json file path> 指定JSON文件的地址,文件內容是需要重新分配的topic列表。這個選項和manual-assignment-json-file選項需要指定其中的一個。
文件內容的格式為 {"topics": [{"topic": "test"},{"topic": "test1"}], "version":1 }
--manual-assignment-json-file<manual assignment json file path> 指定JSON文件的地址,文件內容是手動分配的策略。這個選項和topics-to-move-json-file選項需要指定其中的一個。
文件內容的格式為{"partitions": [{"topic": "test", "partition": 1, "replicas": [1,2,3] }], "version":1 }
--status-check-json-file<partition reassignment json file path> 指定JSON文件的地址,文件內容是partition和partition需要分配到的新的replica的列表。這個JSON文件可以從模擬執行的結果得到。
--execute 如果使用這個選項,那么會執行真實的重新分配分區的操作。如果不指定這個選項,默認會進行模擬執行。
例子:
<1>將test和test1 topic遷移到新的編號為3,4的broker上
./kafka-reassign-partitions.sh --zookeeper 172.1.1.1:2181 --broder-list "3,4" --topics-to-move-json-filetopicMove.json -execute
topicMove.json 的內容是:{"topics":[{"topic":"test"},{"topic","test1"}],"version":1}
<2>將test topic的partition 1 遷移到 broker 1 2 4 上
./kafka-reassign-partitions.sh --zookeeper 172.1.1.1:2181 --broder-list "1,2,4" --manual-assignment-json-file manualAssignment.json --execute
manualAssignment.json的內容為:
{"partitions":[{"topic":"test","partition":1,"replicas":[1,2,4]}],"version":1}
(4)增加Topic的partition數量,命令為:
通過kafka-topics.sh 的alter選項 ,將topic1的partitions從1增加到6;
./kafka-topics.sh --alter --topic topic1 --zookeeper localhost:12181 --partitions 6
(5)手動均衡Topic,讓partition選擇preferred replica作為leader
./kafka-preferred-replica-election.sh
--zookeeper 指定zookeeper的連接地址,格式host:port
--path-to-json-file 指定需要重新進行leader選舉的partition列表文件所在的地址,文件內容的格式為
{“partitions”: [{“topic”: “test”,“partitions”: 1},{“topic”: “test”, “partitions”: 2}]}
默認值為所有存在的partition
例如:
<1> ./kafka-preferred-replica-election.sh --zookeeper 172.1.1.1:2181
<2> ./kafka-preferred-replica-election.sh --zookeeper 172.1.1.1:2181 --path-to-json-file partitionList.json
partitionList.json 文件內容為{"partitions":[{"topic":"test","partition":1},{"topic":"test","partition":2}]}
(6) 查看Consumer的消費和積壓信息
./kafka-consumer-groups.sh --bootstrap-server plaintext://192.168.0.104:9092,plaintext://192.168.0.105:9092,plaintext://192.168.0.106:9092 --new-consumer --describe --group group_test1
(7)動態增加topic副本
1、generate模式,給定需要重新分配的Topic,自動生成reassign plan(並不執行)
2、execute模式,根據指定的reassign plan重新分配Partition
3、verify模式,驗證重新分配Partition是否成功
./bin/kafka-reassign-partitions.sh --zookeeper localhost:12181 --reassignment-json-file replication.json --verify
# replication.json內容為:(書寫的時候寫在一行要不會有問題)
{"partitions":[{"topic":"topic_test1","partition":0,"replicas":[104,105,106]},{"topic":"topic_test1","partition":1,"replicas":[104,105,106]},{"topic":"topic_test1","partition":2,"replicas":[104,105,106]},{"topic":"topic_test1","partition":3,"replicas":[104,105,106]}],"version":1}