Kafka的集群部署和使用


#相關概念和工作流可以參考大神的這兩篇博客

https://www.cnblogs.com/kevingrace/p/9443270.html

https://www.cnblogs.com/kevingrace/p/9021508.html

1.集群服務器架構信息以及基礎環境准備

1.服務器架構規划

ip地址            主機名          安裝軟件
172.31.46.28    kafka01         zookeeper、kafka
172.31.46.63    kafka02         zookeeper、kafka
172.31.46.67    kafka03         zookeeper、kafka
172.31.46.26   kafka-manager   kafka-manager

2.關閉防火牆和綁定hosts(集群所有節點都操作)

#設置主機名,每個節點都做。
[root@k8s-master01 ~]# hostnamectl set-hostname kafka01
#4台機器關閉iptables和selinux
[root@kafka01 ~]# /etc/init.d/iptables stop
[root@kafka01 ~]# vim /etc/sysconfig/selinux
......
SELINUX=disabled
[root@kafka01 ~]# setenforce 0
[root@kafka01 ~]# getenforce
Permissive
#4台機器做hosts綁定,並檢查,如果集群規模較大建議做個bind9
[root@kafka01 ~]# vim /etc/hosts
172.31.46.28 kafka01 172.31.46.63 kafka02 172.31.46.67 kafka03 172.31.46.26  kafka-manager
[root@kafka01 ~]# ping kafka-manager
PING node3 (172.31.46.26) 56(84) bytes of data.
64 bytes from node3 (172.31.46.26): icmp_seq=1 ttl=64 time=0.948 ms
[root@kafka01 ~]# ping kafka02
PING node1 (172.31.46.63) 56(84) bytes of data.
64 bytes from node1 (172.31.46.63): icmp_seq=1 ttl=64 time=0.302 ms

 3.jdk安裝(四台機器都要操作,安裝1.7以上版本

#將jdk-8u131-linux-x64.rpm下載到/opt目錄下 下載地址:https://pan.baidu.com/s/1pLaAjPp
提取密碼:x27s
  
[root@kafka01 ~]# cd /usr/local/src/
[root@kafka01 src]# ll jdk-8u131-linux-x64.rpm
-rw-r--r--. 1 root root 169983496 Sep 28  2017 jdk-8u131-linux-x64.rpm
[root@kafka01 src]# rpm -ivh jdk-8u131-linux-x64.rpm
  
[root@kafka01 src]# vim /etc/profile
......
JAVA_HOME=/usr/java/jdk1.8.0_131
JAVA_BIN=/usr/java/jdk1.8.0_131/bin
PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/root/bin:/bin:/sbin/
CLASSPATH=.:/lib/dt.jar:/lib/tools.jar
export JAVA_HOME JAVA_BIN PATH CLASSPATH
  
[root@kafka01 src]# source /etc/profile
[root@kafka01 src]# java -version
java version "1.8.0_131"
Java(TM) SE Runtime Environment (build 1.8.0_131-b11)
Java HotSpot(TM) 64-Bit Server VM (build 25.131-b11, mixed mode)

 

4.安裝zookeeper集群(以下操作需在所有zookeeper節點操作)

#我們先cd到我們的安裝目錄下(這個根據每個公司的情況,都不一樣)
[root@kafka01 ~]# cd /iflytek
[root@kafka01 iflytek]# wget https://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.6.2/apache-zookeeper-3.6.2-bin.tar.gz
[root@kafka01 iflytek]# tar -zxvf apache-zookeeper-3.6.2-bin.tar.gz
#做軟連接,方面后面升級
[root@kafka01 iflytek]# ln -sv apache-zookeeper-3.6.2-bin zookeeper
[root@kafka01 iflytek]# mkdir -p zookeeper/data [root@kafka01 iflytek]# cp zookeeper/conf/zoo_sample.cfg zookeeper/conf/zoo_sample.cfg.bak [root@kafka01 iflytek]# cp zookeeper/conf/zoo_sample.cfg zookeeper/conf/zoo.cfg #修改zookeeper的配置文件,清楚之前的內容,加入新的內容如下所示
[root@kafka01 iflytek]# vim zookeeper
/conf/zoo.cfg tickTime=2000 initLimit=10 syncLimit=5 dataDir=/iflytek/zookeeper/data/zookeeper dataLogDir=/iflytek/zookeeper/data/logs clientPort=2181 maxClientCnxns=60 autopurge.snapRetainCount=3 autopurge.purgeInterval=1 server.1=172.31.46.28:2888:3888 server.2=172.31.46.63:2888:3888 server.3=172.31.46.67:2888:3888 -------------------------------------
配置參數說明:
tickTime=2000 #服務器與服務器之間和客戶端與服務器之間的單次心跳檢測時間間隔, 單 位為毫秒, 類似於 haproxy 針對 real server 的 check inter 間隔時間 initLimit=10 #集群中 leader 服務器與 follower 服務器初始連接心跳次數, 即多少個 2000 毫 秒 syncLimit=5 # leader 與 follower 之間連接完成之后,后期檢測發送和應答的心跳次數,如 果該 follower 在設置的時間內(5*2000)不能與 leader 進行通信,那么此 follower 將被視為 不可用。 clientPort=2181 #客戶端連接 Zookeeper 服務器的端口, Zookeeper 會監聽這個端口,接受 客戶端的訪問請求 autopurge.snapRetainCount=3 #設置 zookeeper 保存保留多少次客戶端連接的數據 autopurge.purgeInterval=1 #設置 zookeeper 間隔多少小時清理一次保存的客戶端數據 server.1=172.31.46.28:2888:3888#服務器編號=服務器 IP:LF 數據同步端口:LF 選舉端口(表示了不同的zookeeper服務器的自身標識,
作為集群的一部分,每一台服務器應該知道其他服務器的信息。 用戶可以從
"server.id=host:port:port" 中讀取到相關信息。 在服務器的data(dataDir參數所指定的目錄)下創建一個文件名為myid的文件,這個文件的內容只有一行,指定的是自身的id值。 比如,服務器"1"應該在myid文件中寫入"1"。這個id必須在集群環境中服務器標識中是唯一的,且大小在1~255之間。) ----------------------------------
#注意:如果想更換日志輸出位置,除了在zoo.cfg加入"dataLogDir=/iflytek/zookeeper/data/logs"外,還需要修改zkServer.sh文件,
大概修改方式地方在
141行左右,內容如下
[root@kafka01 iflytek]# cp zookeeper/bin/zkServer.sh zookeeper/bin/zkServer.sh.bak
[root@kafka01 iflytek]# vim zookeeper/bin/zkServer.sh
。。。。。。。。。。

 141 ZOO_LOG_DIR="$($GREP "^[[:space:]]*dataLogDir" "$ZOOCFG" | sed -e 's/.*=//')"   #添加這一行
 142 if [ ! -w "$ZOO_LOG_DIR" ] ; then
 143 mkdir -p "$ZOO_LOG_DIR"
 144 fi

 #在啟動zookeeper服務之前,還需要在zookeeper節點機器上創建myid,方式如下:
[root@kafka01 iflytek]# mkdir zookeeper/data/zookeeper/ #因為我是在服務器1上演示的所以這里把1寫進myid這個文件。其他節點類似
[root@kafka01 iflytek]# echo 1 > zookeeper/data/zookeeper/myid
#啟動zookeeper服務,並檢查
[root@kafka01 iflytek]# zookeeper/bin/zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /iflytek/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[root@kafka01 iflytek]# ps -ef|grep zookeeper
[root@kafka01 iflytek]# lsof -i:2181
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
java 735986 root 56u IPv4 3794176 0t0 TCP *:eforward (LISTEN)
#查看各個節點zookeeper的角色

[root@kafka01 iflytek]# zookeeper/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /iflytek/zookeeper/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: follower

[root@kafka02 iflytek]# zookeeper/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /iflytek/zookeeper/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: follower

[root@kafka03 iflytek]# zookeeper/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /iflytek/zookeeper/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: leader

 
 

 

2.安裝部署kafka和集群管理工具kafka-manager

1.安裝kafaka(三個節點同樣操作)

[root@kafka01 ~]# cd /iflytek
[root@kafka01 iflytek]# wget https://mirror.bit.edu.cn/apache/kafka/2.7.0/kafka_2.13-2.7.0.tgz
[root@kafka01 iflytek]# tar -zvxf kafka_2.13-2.7.0.tgz
[root@kafka01 iflytek]# ln -sv kafka_2.13-2.7.0 kafka
#進入kafka下面的config目錄,修改配置文件server.properties:
[root@kafka01 iflytek]# cp kafka/config/server.properties kafka/config/server.properties.bak
[root@kafka01 iflytek]# vim kafka/config/server.properties
broker.id=0
delete.topic.enable=true
listeners=PLAINTEXT://172.31.46.28:9092
num.network.threads=3
num.io.threads
=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/iflytek/kafka/data num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.flush.interval.messages=10000 log.flush.interval.ms=1000 log.retention.hours=168 log.retention.bytes=1073741824 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=172.31.46.28:2181,172.31.46.63:2181,172.31.46.67:2181 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0 #其他兩個節點的server.properties只需要修改下面兩行,其他配置都一樣 [root@kafka02 iflytek]# vim kafka/config/server.properties broker.id=1 ...... listeners=PLAINTEXT://172.31.46.63:9092 ....... [root@kafka03 iflytek]# vim kafka/config/server.properties broker.id=2 ...... listeners=PLAINTEXT://172.31.46.67:9092 ...... #啟動kafka服務 [root@kafka01 iflytek]# nohup kafka/bin/kafka-server-start.sh kafka/config/server.properties >/dev/null 2>&1 & [root@kafka01 iflytek]# lsof -i:9092 COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME java 876461 root 122u IPv4 4531968 0t0 TCP ceph-deploy:XmlIpcRegSvc (LISTEN) java 876461 root 141u IPv4 4531973 0t0 TCP ceph-deploy:59560->ceph-deploy:XmlIpcRegSvc (ESTABLISHED) java 876461 root 142u IPv4 4532804 0t0 TCP ceph-deploy:XmlIpcRegSvc->ceph-deploy:59560 (ESTABLISHED) #驗證服務,隨便在其中一台節點主機執行 [root@kafka01 iflytek]# kafka/bin/kafka-topics.sh --create --zookeeper 172.31.46.28:2181,172.31.46.63:2181,172.31.46.67:2181 --replication-factor 1 --partitions 1 --topic test 出現下面信息說明創建成功 Created topic test. #然后再在其他主機查看上面創建的topic [root@kafka02 iflytek]# kafka/bin/kafka-topics.sh --list --zookeeper 172.31.46.28:2181,172.31.46.63:2181,172.31.46.67:2181 test 到此,kafka集群環境已部署完成!

 2.Kafka命令行操作

#查看當前服務器中的所有topic
[root@kafka01 iflytek]# kafka/bin/kafka-topics.sh --zookeeper 172.31.46.28:2181,172.31.46.63:2181,172.31.46.67:2181 --list
__consumer_offsets
test
test1
#創建topic
[root@kafka01 iflytek]# kafka/bin/kafka-topics.sh --zookeeper 172.31.46.28:2181,172.31.46.63:2181,172.31.46.67:2181 --create --replication-factor 3 --partitions 1 --topic test2
選項說明: --topic 定義 topic 名 --replication-factor 定義副本數 --partitions 定義分區數 #刪除topic(需要 server.properties 中設置 delete.topic.enable=true 否則只是標記刪除)
[root@kafka01 iflytek]# kafka/bin/kafka-topics.sh --zookeeper 172.31.46.28:2181,172.31.46.63:2181,172.31.46.67:2181 --delete --topic test
#查看某個Topic的詳情
[root@kafka01 iflytek]# kafka/bin/kafka-topics.sh --zookeeper 172.31.46.28:2181,172.31.46.63:2181,172.31.46.67:2181 --describe --topic test1
Topic: test1    PartitionCount: 1    ReplicationFactor: 3    Configs: 
    Topic: test1    Partition: 0    Leader: 1    Replicas: 1,2,0    Isr: 1,2,0 #修改分區數
[root@kafka01 iflytek]# kafka/bin/kafka-topics.sh --zookeeper 172.31.46.28:2181,172.31.46.63:2181,172.31.46.67:2181 --alter --topic test1 --partitions 6 #測試生產消息,這個下面的地址加端口填kafka集群中的節點就行了
[root@kafka01 iflytek]# kafka/bin/kafka-console-producer.sh --broker-list 172.31.46.28:9092 --topic test1 >Hello
#測試消費信息
[root@kafka02 iflytek]# kafka/bin/kafka-console-consumer.sh  --bootstrap-server 172.31.46.63:9092 --topic test1
Hello
#查看指定topic中以往所有的數據都讀出來(這個前提是消息數據還在默認的保留周期內)--from-beginning: 會把主題中以往所有的數據都讀取出來。
[root@kafka03 iflytek]# kafka/bin/kafka-console-consumer.sh  --bootstrap-server 172.31.46.67:9092 --from-beginning --topic test1

 

3.安裝kafka集群管理工具kafka-manager簡單使用介紹

為了簡化開發者和服務工程師維護Kafka集群的工作,yahoo構建了一個叫做Kafka管理器的基於Web工具,叫做 Kafka Manager。kafka-manager 項目地址:https://github.com/yahoo/kafka-manager。這個管理工具可以很容易地發現分布在集群中的哪些topic分布不均勻,或者是分區在整個集群分布不均勻的的情況。它支持管理多個集群、選擇副本、副本重新分配以及創建Topic。同時,這個管理工具也是一個非常好的可以快速瀏覽這個集群的工具,kafka-manager有如下功能:
- 管理多個kafka集群
- 便捷的檢查kafka集群狀態(topics,brokers,備份分布情況,分區分布情況)
- 選擇你要運行的副本
- 基於當前分區狀況進行
- 可以選擇topic配置並創建topic(0.8.1.1和0.8.2的配置不同)
- 刪除topic(只支持0.8.2以上的版本並且要在broker配置中設置delete.topic.enable=true)
- Topic list會指明哪些topic被刪除(在0.8.2以上版本適用)
- 為已存在的topic增加分區
- 為已存在的topic更新配置
- 在多個topic上批量重分區
- 在多個topic上批量重分區(可選partition broker位置)

kafka-manager安裝過程如下

下載安裝 kafka-manager
想要查看和管理Kafka,完全使用命令並不方便,我們可以使用雅虎開源的Kafka-manager,GitHub地址如下:
https://github.com/yahoo/kafka-manager
 
也可以使用Git或者直接從Releases中下載,此處從下面的地址下載 1.3.3.7 版本:
https://github.com/yahoo/kafka-manager/releases
 
需要注意:
上面下載的是源碼,下載后需要按照后面步驟進行編譯。如果覺得麻煩,可以直接下載編譯好的kafka-manager-1.3.3.7.zip。
下載地址:https://pan.baidu.com/s/12j2DEt94WsWRY6dD9aR6BQ
提取密碼:8x57
[root@kafka-manager ~]# cd /iflytek
[root@kafka-manager iflytek]# unzip kafka-manager-1.3.3.7.zip
[root@kafka-manager iflytek]# ln -sv kafka-manager-1.3.3.7 kafka-manager
[root@kafka-manager iflytek]# cp kafka-manager/conf/application.conf kafka-manager/conf/application.conf.bak
[root@kafka-manager iflytek]# vim kafka-manager/conf/application.conf
................
#kafka-manager.zkhosts="localhost:2181"                  #注釋這一行,下面添加一行,下面的地址,是zookeeper的節點地址
kafka-manager.zkhosts="172.31.46.28:2181,172.31.46.63:2181,172.31.46.67:2181"
#啟動kafka-manager
[root@kafka-manager iflytek]# nohup kafka-manager/bin/kafka-manager >/dev/null 2>&1 &
----------------------------------------------------------------------------------------------------
需要注意:
kafka-manager 默認的端口是9000,可通過 -Dhttp.port,指定端口; -Dconfig.file=conf/application.conf指定配置文件:
[root@kafka-manager iflytek]# nohup kafka-manager/bin/kafka-manager -Dconfig.file=kafka-manager/conf/application.conf -Dhttp.port=8080 &
----------------------------------------------------------------------------------------------------
啟動完畢后可以查看端口是否啟動,由於啟動過程需要一段時間,端口起來的時間可能會延后。
[root@kafka-manager iflytek]# lsof -i:9000
COMMAND    PID USER   FD   TYPE  DEVICE SIZE/OFF NODE NAME
java    651368 root  150u  IPv4 3296558      0t0  TCP *:cslistener (LISTEN)
最后就可以使用http://172.31.46.26:9000訪問了

 

 

 kafka-mamager測試
新建 Cluster1
點擊【Cluster】>【Add Cluster】打開如下添加集群的配置界面:
輸入集群的名字(如Kafka-Cluster-test)和 Zookeeper 服務器地址(如localhost:2181),選擇最接近的Kafka版本(如0.10.1.1)
-------------------------------------------------------------------
注意:如果沒有在 Kafka 中配置過 JMX_PORT,千萬不要選擇第一個復選框。
Enable JMX Polling
如果選擇了該復選框,Kafka-manager 可能會無法啟動。

 

 新建完成后,運行界面如下:

 

 

 

 

 

 查看broker信息

 

 管理 kafka-mamager

新建主題(消息類別)
點擊【Topic】>【Create】可以方便的創建並配置主題。如下顯示。

 

 查看結果

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM