手把手教你搭建zookeeper+kafka集群+kafka managerg管理


1. kafka的定義

  kafka是一個分布式消息系統,由linkedin使用scala編寫,用作LinkedIn的活動流(Activity Stream)和運營數據處理管道(Pipeline)的基礎。具有高水平擴展和高吞吐量。

2,基本的概念

(1)消費者(consumer):從消息隊列中請求消息的客戶端應用程序

(2)生產者(producer):向broker發布消息的應用程序

(3)AMQP服務端(broker):用來接收生產者發送的消息並將這些消息路由給服務器中的隊列,便於fafka將生產者發送的消息,

動態的添加到磁盤並給每一條消息一個偏移量,所以對於kafka一個broker就是一個應用程序的實例

 

kafka支持的客戶端語言:Kafka客戶端支持當前大部分主流語言,包括:C、C++、Erlang、Java、.net、perl、PHP、Python、Ruby、Go、Javascript

可以使用以上任何一種語言和kafka服務器進行通信(即辨析自己的consumer從kafka集群訂閱消息也可以自己寫producer程序)

1. 主題(Topic):一個主題類似新聞中的體育、娛樂、教育等分類概念,在實際工程中通常一個業務一個主題。

2. 分區(Partition):一個Topic中的消息數據按照多個分區組織,分區是kafka消息隊列組織的最小單位,

一個分區可以看作是一個FIFO( First Input First Output的縮寫,先入先出隊列)的隊列。

kafka分區是提高kafka性能的關鍵所在,當你發現你的集群性能不高時,常用手段就是增加Topic的分區,

分區里面的消息是按照從新到老的順序進行組織,消費者從隊列頭訂閱消息,生產者從隊列尾添加消息。

 

3 台服務器

192.168.120.207  master

192.168.120.208  salve

192.168.120.206  salve2

3,zookeeper 集群搭建

1)首先需要安裝 jdk

三個節點都需要安裝 jdk 支持:

(注意)三個節點都要安裝

tar zxvf jdk1.8.0_91.tar.gz  -C /usr/local/
vim /etc/profile    ##在最后添加java環境
JAVA_HOME=/usr/local/jdk1.8.0_91
JAVA_BIN=$JAVA_HOME/bin
PATH=$PATH:$JAVA_BIN
CLASSPATH=$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export JAVA_HOME JAVA_BIN PATH CLASSPATH

#把添加鏈接到bin目錄下
ln -vs /usr/local/jdk1.8.0_91/bin/java /usr/bin/

#測試java環境,打印java版本
java -version

  

2)首先需要安裝 zookeeper集群

##解壓zookeeper
tar xf zookeeper.tar.gz -C /usr/local/
cd /usr/local/zookeeper/conf/
##創建zookeeper所需要的目錄和myid文件 mkdir -pv /opt/{zkdata,kafka_data}
##zkdata是zookeeper數據目錄
##kafka_data是kafka數據目錄


##找到zoo.cfg,沒有就zoo_sample.cfg復制成zoo.cfg vim zoo.cfg #配置文件如下:
tickTime=2000 initLimit=10 syncLimit=5
dataDir=/opt/zkdata clientPort=30000 server.3=192.168.120.206:30010:30020 server.2=192.168.120.208:30010:30020 server.1=192.168.120.207:30010:30020

配置說明:
tickTime:客戶端與服務器或者服務器與服務器之間每個tickTime時間就會發送一次心跳。
通過心跳不僅能夠用來監聽機器的工作狀態,還可以通過心跳來控制Flower跟Leader的通信時間,默認2秒 initLimit:集群中的follower服務器(F)與leader服務器(L)
之間初始連接時能容忍的最多心跳數(tickTime的數量)。 syncLimit:集群中flower服務器(F)跟leader(L)
服務器之間的請求和答應最多能容忍的心跳數。 dataDir:該屬性對應的目錄是用來存放myid信息跟一些版本,
日志,跟服務器唯一的ID信息等。
clientPort:客戶端連接的接口,客戶端連接zookeeper服務器的端口,
zookeeper會監聽這個端口,接收客戶端的請求訪問!這個端口默認是2181,我自定義為30000。
30000是zookeeper端口
30010是zookeeper管理端口(leader)
30020是zookeeper節點端口(follower)
service.N=YYY:A:B
N:代表服務器編號(也就是myid里面的值)
YYY:服務器地址
A:表示 Flower 跟 Leader的通信端口,簡稱服務端內部通信的端口(默認2888)
B:表示 是選舉端口(默認是3888)

其他節點配置相同,除以下配置:
echo 1 > /opt/zkdata/myid
#myid文件,里面的內容為數字,
用於標識主機,如果這個文件沒有的話,zookeeper無法啟動
我的主機:192.168.120.207是1所以,echo 1 > /opt/zkdata/myid,其他主機變更echo寫入的數字即可

啟動zookeeper:
/usr/local/zookeeper/bin/zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... taSTARTED
zkServer.sh start   啟動
zkServer.sh status 查看啟動狀態,看那個是leader節點,因為是集群自動選舉的。

    

     

 

 啟動成功,到這里zookeeper集群搭建好了

 

 30010端口是leader,因為是自定義的,再說一遍。

  

3)首先需要安裝 kafka集群

三個節點一樣的操作

解壓kafka安裝包

tar xf kafka_2.12-0.10.2.1.tgz -C /usr/local/

vim /usr/local/kafka_2.12-0.10.2.1/config/server.properties

##kafka配置文件如下:
主要修改標記的4項,沒特殊需求可以暫時不動其他配置項。
[root@salve2 zk+kafka]# grep -v "#" /usr/local/kafka_2.12-0.10.2.1/config/server.properties ############################# Server Basics ############################# broker.id=1 #id唯一,可以與之前的myid設置成一致的。 ############################# Socket Server Settings ############################# advertised.listeners=PLAINTEXT://192.168.120.206:9092 ##設置本機ip及端口 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 ############################# Log Basics ############################# log.dirs=/opt/kafka_data ##kafka數據目錄 num.partitions=1 num.recovery.threads.per.data.dir=1 ############################# Log Flush Policy ############################# ############################# Log Retention Policy ############################# log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 ############################# Zookeeper ############################# zookeeper.connect=192.168.120.207:30000,192.168.120.208:30000,192.168.120.206:30000 ##是zookeeper集群地址和端口,以,分隔 zookeeper.connection.timeout.ms=6000

  

3)啟動kafka集群

##后台啟動kafka,每台都要啟動
nohup /usr/local/kafka_2.12-0.10.2.1/bin/kafka-server-start.sh  /usr/local/kafka_2.12-0.10.2.1/config/server.properties &

##查看端口確保:30000和30010或者30020,因為這是zookeeper的端口,這些端口沒有kafka是起不起來的。
 

 

4)創建主題驗證生產者與消費者

創建主題為pytest1:
創建命令:/usr/local/kafka_2.12-0.10.2.1/bin/kafka-topics.sh --create --zookeeper 192.168.120.208:30000,192.168.120.207:30000,192.168.120.206:30000 --replication-factor 1  --partitions 1 --topic pytest1 Created topic "pytest1".

 

參數解釋:

  --create   ##創建

  --zookeeper  ##在zookeeper集群,以英文逗號分隔

  --replication-factor  ##復制多少份

   --partitions        ##備份多少份

   --topic     ##主題名稱

 

啟動一個生產者,多個消費者:

生產者:

./kafka-console-producer.sh --broker-list 192.168.120.207:9092,192.168.120.208:9092,192.168.120.206:9092 --topic pytest1

消費者:

/usr/local/kafka_2.12-0.10.2.1/bin/kafka-console-consumer.sh --bootstrap-server 192.168.120.207:9092,192.168.120.208:9092,192.168.120.206:9092 --topic  pytest1  --from-beginning

 

 

 

5)管理kafka集群:

kafka manager 下載地址:https://github.com/yahoo/kafka-manager   可以下載源碼自行編譯。這里使用已經編譯好的包直接搭建。

當然也可以用我這已經編譯好的包。

鏈接:https://pan.baidu.com/s/1RQHARizISqfCGqqe5nz4yg
提取碼:qzgp

解壓kafka-manager
unzip kafka-manager-1.3.3.7.zip
移動到統一安裝目錄下
mv kafka-manager-1.3.3.7 /usr/local/
cd /usr/local/kafka-manager-1.3.3.7/
vim application.conf
修改兩個地方:
kafka-manager.zkhosts
akka
示例如下:
kafka-manager.zkhosts="192.168.120.207:30000,192.168.120.208:30000,192.168.120.206:30000"
akka {
  loggers = ["akka.event.slf4j.Slf4jLogger"]
  loglevel = "INFO"
  logger-startup-timeout = 30s
}
添加:logger-startup-timeout = 30s要不然報錯。
我沒有添加前報錯如下:
 [WARN] [07/30/2020 15:17:00.493] [main] [EventStream(akka://kafka-manager-system)] Logger log1-Slf4

啟動:
./kafka-manager -Dconfig.file=../conf/application.conf  -Dhttp.port=8080 &

訪問:192.168.120.207:8080

 

 

 

 

 

 

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM