Centos7安裝zookeeper+kafka集群


Centos7安裝zookeeper+kafka集群

1  Zookeeper和kafka簡介

1)  ZooKeeper

是一個分布式的、分層級的文件系統,能促進客戶端間的松耦合,並提供最終一致的,用於管理、協調Kafka代理,zookeeper集群中一台服務器作為Leader,其它作為Follower

2)  Apache Kafka

是分布式發布-訂閱消息系統,kafka對消息保存時根據Topic進行歸類,每個topic將被分成多個partition(區),每條消息在文件中的位置稱為offset(偏移量),offset為一個long型數字,它是唯一標記一條消息,它唯一的標記一條消息。

一個partition中的消息只會被group中的一個consumer消費,每個group中consumer消息消費互相獨立,不過一個consumer可以消費多個partitions中的消息。

kafka只能保證一個partition中的消息被某個consumer消費時,消息是順序的。從Topic角度來說,消息仍不是有序的。

每個partition都有一個server為"leader";leader負責所有的讀寫操作,如果leader失效,那么將會有其他follower來接管成為新的leader(有zookeeper選舉);follower只是單調的和leader跟進,同步消息即可..由此可見作為leader的server承載了全部的請求壓力,因此從集群的整體考慮,有多少個partitions就意味着有多少個"leader",kafka會將"leader"均衡的分散在每個實例上,來確保整體的性能穩定.

2  服務器地址

使用3台服務器搭建集群

Server1:192.168.89.11

Server2:192.168.89.12

Server3:192.168.89.13

3  安裝jdk

(jdk1.8.0_102)

# yum install -y java-1.8.0-openjdk java-1.8.0-openjdk-devel

4  搭建zookeeper集群

Zookeeper集群的工作是超過半數才能對外提供服務,所以選擇主機數可以是1台、3台、5台…。

3台中允許1台掛掉 ,是否可以用偶數,其實沒必要。

如果有4台,那么掛掉一台還剩下三台服務器,如果再掛掉一個就不行了,記住是超過半數。

4.1 下載地址

http://mirrors.shu.edu.cn/apache/zookeeper/zookeeper-3.4.13/zookeeper-3.4.13.tar.gz

4.2 所有節點安裝zookeeper

1)創建zookeeper安裝目錄

# mkdir -p /data/zookeeper

2)將zookeeper解壓到安裝目錄

# tar -zxvf zookeeper-3.4.13.tar.gz -C /data/zookeeper/

3)新建保存數據的目錄

# mkdir -p /data/zookeeper/zookeeper-3.4.13/data

4)新建日志目錄

# mkdir -p /data/zookeeper/zookeeper-3.4.13/dataLog

5)配置環境變量並刷新

# vim /etc/profile

===================================================

 export ZK_HOME=/data/zookeeper/zookeeper-3.4.13

export PATH=$PATH:$ZK_HOME/bin

===================================================

# source /etc/profile

4.3 所有節點配置zookeeper配置文件

4.3.1        各節點中配置

# cd /data/zookeeper/zookeeper-3.4.13/conf/

# cp -f zoo_sample.cfg zoo.cfg

# vim zoo.cfg

====================================================

tickTime=2000

initLimit=10

syncLimit=5

dataDir=/data/zookeeper/zookeeper-3.4.13/data/

dataLogDir=/data/zookeeper/zookeeper-3.4.13/dataLog/

clientPort=2181

server.1=192.168.89.11:2888:3888

server.2=192.168.89.12:2888:3888

server.3=192.168.89.13:2888:3888

#第一個端口是master和slave之間的通信端口,默認是2888,第二個端口是leader選舉的端口,集群剛啟動的時候選舉或者leader掛掉之后進行新的選舉的端口默認是3888

====================================================

# echo "1" > /data/zookeeper/zookeeper-3.4.13/data/myid   #server1配置,各節點不同,跟上面配置server.1的號碼一樣

# echo "2" > /data/zookeeper/zookeeper-3.4.13/data/myid   #server2配置,各節點不同,跟上面配置server.2的號碼一樣

# echo "3" > /data/zookeeper/zookeeper-3.4.13/data/myid   #server3配置,各節點不同,跟上面配置server.3的號碼一樣

4.3.2        啟動停止zookeeper命令並設置開機啟動

1)啟動停止zookeeper命令

# zkServer.sh start   #啟動

# zkServer.sh stop   #停止

# zkCli.sh   #連接集群

2)設置開機啟動

# cd /usr/lib/systemd/system

# vim zookeeper.service

=========================================

[Unit]

Description=zookeeper server daemon

After=zookeeper.target

 

[Service]

Type=forking

ExecStart=/data/zookeeper/zookeeper-3.4.13/bin/zkServer.sh start

ExecReload=/data/zookeeper/zookeeper-3.4.13/bin/zkServer.sh stop && sleep 2 && /data/zookeeper/zookeeper-3.4.13/bin/zkServer.sh start

ExecStop=/data/zookeeper/zookeeper-3.4.13/bin/zkServer.sh stop

Restart=always

 

[Install]

WantedBy=multi-user.target

=======================================================

# systemctl start  zookeeper

# systemctl enable zookeeper

5   搭建kafka集群

5.1 下載地址

http://mirror.bit.edu.cn/apache/kafka/2.1.0/kafka_2.12-2.1.0.tgz

5.2 所有節點上搭建kafka

1)  新建kafka工作目錄

# mkdir -p /data/kafka

2)  解壓kafka

# tar -zxvf kafka_2.12-2.1.0.tgz -C /data/kafka/

3)  新建kafka日志目錄

# mkdir -p /data/kafka/kafkalogs

4)  配置kafka配置文件

# vim /data/kafka/kafka_2.12-2.1.0/config/server.properties

=================================================

broker.id=1   #每一個broker在集群中的唯一標示,要求是正數

listeners=PLAINTEXT://192.168.89.11:9092  # 套接字服務器連接的地址

log.dirs=/data/kafka/kafkalogs/    #kafka數據的存放地址

message.max.byte=5242880     #消息體的最大大小,單位是字節

log.cleaner.enable=true #開啟日志清理

log.retention.hours=72    #segment文件保留的最長時間(小時),超時將被刪除,也就是說3天之前的數據將被清理掉

log.segment.bytes=1073741824  #日志文件中每個segmeng的大小(字節),默認為1G

log.retention.check.interval.ms=300000  #定期檢查segment文件有沒有達到1G(單位毫秒)

num.partitions=3  #每個topic的分區個數, 更多的分區允許更大的並行操作default.replication.factor=3  # 一個topic ,默認分區的replication個數 ,不得大於集群中broker的個數

delete.topic.enable=true   # 選擇啟用刪除主題功能,默認false

replica.fetch.max.bytes=5242880  # replicas每次獲取數據的最大大小

#以下三個參數設置影響消費者消費分區可以連接的kafka主機,詳細請看第6點附錄

offsets.topic.replication.factor=3  #Offsets topic的復制因子(備份數)

transaction.state.log.replication.factor=3  #事務主題的復制因子(設置更高以確保可用性)

transaction.state.log.min.isr=3 #覆蓋事務主題的min.insync.replicas配置

zookeeper.connect=192.168.89.11:2181,192.168.89.12:2181,192.168.89.13:2181

#zookeeper集群的地址,可以是多個

=================================================

5)   kafka節點默認需要的內存為1G,如果需要修改內存,可以修改kafka-server-start.sh的配置項

# vim /data/kafka/kafka_2.12-2.1.0/bin/kafka-server-start.sh

#找到KAFKA_HEAP_OPTS配置項,例如修改如下:

       export KAFKA_HEAP_OPTS="-Xmx2G -Xms2G"

5.3 啟動kafka並設置開機啟動

1)啟動kafka

# cd /data/kafka/kafka_2.12-2.1.0/

./bin/kafka-server-start.sh -daemon ./config/server.properties

啟動后可以執行jps命令查看kafka是否啟動,如果啟動失敗,可以進入logs目錄,查看kafkaServer.out日志記錄。

2)設置開機啟動

# cd /usr/lib/systemd/system

# vim kafka.service

=========================================

[Unit]

Description=kafka server daemon

After=kafka.target

 

[Service]

Type=forking

ExecStart=/data/kafka/kafka_2.12-2.1.0/bin/kafka-server-start.sh -daemon /data/kafka/kafka_2.12-2.1.0/config/server.properties

ExecReload=/data/kafka/kafka_2.12-2.1.0/bin/kafka-server-stop.sh && sleep 2 && /data/kafka/kafka_2.12-2.1.0/bin/kafka-server-start.sh -daemon /

data/kafka/kafka_2.12-2.1.0/config/server.properties

ExecStop=/data/kafka/kafka_2.12-2.1.0/bin/kafka-server-stop.sh

Restart=always

 

[Install]

WantedBy=multi-user.target

=======================================================

# systemctl start kafka

# systemctl enable kafka

5.4 創建topic

創建3分區、3備份

# cd /data/kafka/kafka_2.12-2.1.0/

#./bin/kafka-topics.sh --create --zookeeper 192.168.89.11:2181,192.168.89.12:2181,192.168.89.13:2181 --replication-factor 3 --partitions 3 --topic SyslogTopic

5.5 常用命令

# cd /data/kafka/kafka_2.12-2.1.0/

1)  停止kafka

./bin/kafka-server-stop.sh 

2)  創建topic

./bin/kafka-topics.sh --create --zookeeper 192.168.89.11:2181,192.168.89.12:2181,192.168.89.13:2181 --replication-factor 1 --partitions 1 --topic topic_name

3)  展示topic

./bin/kafka-topics.sh --list --zookeeper 192.168.89.11:2181,192.168.89.12:2181,192.168.89.13:2181

4) 查看描述topic

./bin/kafka-topics.sh --describe --zookeeper 192.168.89.11:2181,192.168.89.12:2181,192.168.89.13:2181 --topic topic_name

5)  生產者發送消息

./bin/kafka-console-producer.sh --broker-list 192.168.89.11:9092 --topic topic_name

6)  消費者消費消息

./bin/kafka-console-consumer.sh --bootstrap-server 192.168.89.11:9092,192.168.89.12:9092,192.168.89.13:9092 --topic topic_name

7)  刪除topic

./bin/kafka-topics.sh --delete --topictopic_name --zookeeper 192.168.89.11:2181,192.168.89.12:2181,192.168.89.13:2181

8)  查看每分區consumer_offsets(可以連接到的消費主機)

./bin/kafka-topics.sh --describe --zookeeper 192.168.89.11:2181,192.168.89.12:2181,192.168.89.13:2181 --topic __consumer_offsets

6   附錄

1、  消費者消費分區數據,kafka的負載均衡

在/data/kafka/kafka_2.12-2.1.0/config/server.properties文件中修改offsets.topic.replication.factor,這個值為kafka集群的主機數量(__consumer_offest不受server.properties中num.partitions和default.replication.factor參數的制約。相反地,它的分區數和備份因子分別由offsets.topic.num.partitions和offsets.topic.replication.factor參數決定。這兩個參數的默認值分別是50和1,表示該topic有50個分區,副本因子是1。),設置正確如下圖:

(執行命令:bin/kafka-topics.sh --describe --zookeeper 192.168.89.11:2181,192.168.89.12:2181,192.168.89.13:2181 --topic __consumer_offsets)

注:如果配置文件offsets.topic.replication.factor設置成1后啟動了一次,再設置成3重新啟動副本因子不會更改,需要以下方法:

另外一種方法設置:

1)  創建規則json

cat > increase-replication-factor.json <<EOF

{"version":1, "partitions":[

{"topic":"__consumer_offsets","partition":0,"replicas":[1,2,3]},

{"topic":"__consumer_offsets","partition":1,"replicas":[1,2,3]},

{"topic":"__consumer_offsets","partition":2,"replicas":[1,2,3]},

{"topic":"__consumer_offsets","partition":3,"replicas":[1,2,3]},

{"topic":"__consumer_offsets","partition":4,"replicas":[1,2,3]},

{"topic":"__consumer_offsets","partition":5,"replicas":[1,2,3]},

{"topic":"__consumer_offsets","partition":6,"replicas":[1,2,3]},

{"topic":"__consumer_offsets","partition":7,"replicas":[1,2,3]},

{"topic":"__consumer_offsets","partition":8,"replicas":[1,2,3]},

{"topic":"__consumer_offsets","partition":9,"replicas":[1,2,3]},

{"topic":"__consumer_offsets","partition":10,"replicas":[1,2,3]},

{"topic":"__consumer_offsets","partition":11,"replicas":[1,2,3]},

{"topic":"__consumer_offsets","partition":12,"replicas":[1,2,3]},

{"topic":"__consumer_offsets","partition":13,"replicas":[1,2,3]},

{"topic":"__consumer_offsets","partition":14,"replicas":[1,2,3]},

{"topic":"__consumer_offsets","partition":15,"replicas":[1,2,3]},

{"topic":"__consumer_offsets","partition":16,"replicas":[1,2,3]},

{"topic":"__consumer_offsets","partition":17,"replicas":[1,2,3]},

{"topic":"__consumer_offsets","partition":18,"replicas":[1,2,3]},

{"topic":"__consumer_offsets","partition":19,"replicas":[1,2,3]},

{"topic":"__consumer_offsets","partition":20,"replicas":[1,2,3]},

{"topic":"__consumer_offsets","partition":21,"replicas":[1,2,3]},

{"topic":"__consumer_offsets","partition":22,"replicas":[1,2,3]},

{"topic":"__consumer_offsets","partition":23,"replicas":[1,2,3]},

{"topic":"__consumer_offsets","partition":24,"replicas":[1,2,3]},

{"topic":"__consumer_offsets","partition":25,"replicas":[1,2,3]},

{"topic":"__consumer_offsets","partition":26,"replicas":[1,2,3]},

{"topic":"__consumer_offsets","partition":27,"replicas":[1,2,3]},

{"topic":"__consumer_offsets","partition":28,"replicas":[1,2,3]},

{"topic":"__consumer_offsets","partition":29,"replicas":[1,2,3]},

{"topic":"__consumer_offsets","partition":30,"replicas":[1,2,3]},

{"topic":"__consumer_offsets","partition":31,"replicas":[1,2,3]},

{"topic":"__consumer_offsets","partition":32,"replicas":[1,2,3]},

{"topic":"__consumer_offsets","partition":33,"replicas":[1,2,3]},

{"topic":"__consumer_offsets","partition":34,"replicas":[1,2,3]},

{"topic":"__consumer_offsets","partition":35,"replicas":[1,2,3]},

{"topic":"__consumer_offsets","partition":36,"replicas":[1,2,3]},

{"topic":"__consumer_offsets","partition":37,"replicas":[1,2,3]},

{"topic":"__consumer_offsets","partition":38,"replicas":[1,2,3]},

{"topic":"__consumer_offsets","partition":39,"replicas":[1,2,3]},

{"topic":"__consumer_offsets","partition":40,"replicas":[1,2,3]},

{"topic":"__consumer_offsets","partition":41,"replicas":[1,2,3]},

{"topic":"__consumer_offsets","partition":42,"replicas":[1,2,3]},

{"topic":"__consumer_offsets","partition":43,"replicas":[1,2,3]},

{"topic":"__consumer_offsets","partition":44,"replicas":[1,2,3]},

{"topic":"__consumer_offsets","partition":45,"replicas":[1,2,3]},

{"topic":"__consumer_offsets","partition":46,"replicas":[1,2,3]},

{"topic":"__consumer_offsets","partition":47,"replicas":[1,2,3]},

{"topic":"__consumer_offsets","partition":48,"replicas":[1,2,3]},

{"topic":"__consumer_offsets","partition":49,"replicas":[1,2,3]}]

}

EOF

2)  執行

bin/kafka-reassign-partitions.sh --zookeeper 192.168.89.11:2181,192.168.89.12:2181,192.168.89.13:2181 --reassignment-json-file increase-replication-factor.json --execute

3)  驗證

bin/kafka-reassign-partitions.sh --zookeeper 192.168.89.11:2181,192.168.89.12:2181,192.168.89.13:2181 --reassignment-json-file increase-replication-factor.json --verify

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM