1. kafka的定義
kafka是一個分布式消息系統,由linkedin使用scala編寫,用作LinkedIn的活動流(Activity Stream)和運營數據處理管道(Pipeline)的基礎。具有高水平擴展和高吞吐量。
2,基本的概念
(1)消費者(consumer):從消息隊列中請求消息的客戶端應用程序
(2)生產者(producer):向broker發布消息的應用程序
(3)AMQP服務端(broker):用來接收生產者發送的消息並將這些消息路由給服務器中的隊列,便於fafka將生產者發送的消息,
動態的添加到磁盤並給每一條消息一個偏移量,所以對於kafka一個broker就是一個應用程序的實例
kafka支持的客戶端語言:Kafka客戶端支持當前大部分主流語言,包括:C、C++、Erlang、Java、.net、perl、PHP、Python、Ruby、Go、Javascript
可以使用以上任何一種語言和kafka服務器進行通信(即辨析自己的consumer從kafka集群訂閱消息也可以自己寫producer程序)
1. 主題(Topic):一個主題類似新聞中的體育、娛樂、教育等分類概念,在實際工程中通常一個業務一個主題。
2. 分區(Partition):一個Topic中的消息數據按照多個分區組織,分區是kafka消息隊列組織的最小單位,
一個分區可以看作是一個FIFO( First Input First Output的縮寫,先入先出隊列)的隊列。
kafka分區是提高kafka性能的關鍵所在,當你發現你的集群性能不高時,常用手段就是增加Topic的分區,
分區里面的消息是按照從新到老的順序進行組織,消費者從隊列頭訂閱消息,生產者從隊列尾添加消息。
3 台服務器
192.168.120.207 master
192.168.120.208 salve
192.168.120.206 salve2
3,zookeeper 集群搭建
1)首先需要安裝 jdk
三個節點都需要安裝 jdk 支持:
(注意)三個節點都要安裝 tar zxvf jdk1.8.0_91.tar.gz -C /usr/local/ vim /etc/profile ##在最后添加java環境 JAVA_HOME=/usr/local/jdk1.8.0_91 JAVA_BIN=$JAVA_HOME/bin PATH=$PATH:$JAVA_BIN CLASSPATH=$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar export JAVA_HOME JAVA_BIN PATH CLASSPATH #把添加鏈接到bin目錄下 ln -vs /usr/local/jdk1.8.0_91/bin/java /usr/bin/ #測試java環境,打印java版本 java -version
2)首先需要安裝 zookeeper集群
##解壓zookeeper tar xf zookeeper.tar.gz -C /usr/local/ cd /usr/local/zookeeper/conf/
##創建zookeeper所需要的目錄和myid文件 mkdir -pv /opt/{zkdata,kafka_data}
##zkdata是zookeeper數據目錄
##kafka_data是kafka數據目錄
##找到zoo.cfg,沒有就zoo_sample.cfg復制成zoo.cfg vim zoo.cfg #配置文件如下:
tickTime=2000 initLimit=10 syncLimit=5
dataDir=/opt/zkdata clientPort=30000 server.3=192.168.120.206:30010:30020 server.2=192.168.120.208:30010:30020 server.1=192.168.120.207:30010:30020
配置說明:
tickTime:客戶端與服務器或者服務器與服務器之間每個tickTime時間就會發送一次心跳。
通過心跳不僅能夠用來監聽機器的工作狀態,還可以通過心跳來控制Flower跟Leader的通信時間,默認2秒 initLimit:集群中的follower服務器(F)與leader服務器(L)
之間初始連接時能容忍的最多心跳數(tickTime的數量)。 syncLimit:集群中flower服務器(F)跟leader(L)
服務器之間的請求和答應最多能容忍的心跳數。 dataDir:該屬性對應的目錄是用來存放myid信息跟一些版本,
日志,跟服務器唯一的ID信息等。
clientPort:客戶端連接的接口,客戶端連接zookeeper服務器的端口,
zookeeper會監聽這個端口,接收客戶端的請求訪問!這個端口默認是2181,我自定義為30000。
30000是zookeeper端口
30010是zookeeper管理端口(leader)
30020是zookeeper節點端口(follower)
service.N=YYY:A:B N:代表服務器編號(也就是myid里面的值) YYY:服務器地址 A:表示 Flower 跟 Leader的通信端口,簡稱服務端內部通信的端口(默認2888) B:表示 是選舉端口(默認是3888)
其他節點配置相同,除以下配置:
echo 1 > /opt/zkdata/myid #myid文件,里面的內容為數字,
用於標識主機,如果這個文件沒有的話,zookeeper無法啟動
我的主機:192.168.120.207是1所以,echo 1 > /opt/zkdata/myid,其他主機變更echo寫入的數字即可
啟動zookeeper:
/usr/local/zookeeper/bin/zkServer.sh start
ZooKeeper JMX enabled by default Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg Starting zookeeper ... taSTARTED
zkServer.sh start 啟動
zkServer.sh status 查看啟動狀態,看那個是leader節點,因為是集群自動選舉的。
啟動成功,到這里zookeeper集群搭建好了
30010端口是leader,因為是自定義的,再說一遍。
3)首先需要安裝 kafka集群
三個節點一樣的操作 解壓kafka安裝包 tar xf kafka_2.12-0.10.2.1.tgz -C /usr/local/ vim /usr/local/kafka_2.12-0.10.2.1/config/server.properties ##kafka配置文件如下: 主要修改標記的4項,沒特殊需求可以暫時不動其他配置項。
[root@salve2 zk+kafka]# grep -v "#" /usr/local/kafka_2.12-0.10.2.1/config/server.properties ############################# Server Basics ############################# broker.id=1 #id唯一,可以與之前的myid設置成一致的。 ############################# Socket Server Settings ############################# advertised.listeners=PLAINTEXT://192.168.120.206:9092 ##設置本機ip及端口 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 ############################# Log Basics ############################# log.dirs=/opt/kafka_data ##kafka數據目錄 num.partitions=1 num.recovery.threads.per.data.dir=1 ############################# Log Flush Policy ############################# ############################# Log Retention Policy ############################# log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 ############################# Zookeeper ############################# zookeeper.connect=192.168.120.207:30000,192.168.120.208:30000,192.168.120.206:30000 ##是zookeeper集群地址和端口,以,分隔 zookeeper.connection.timeout.ms=6000
3)啟動kafka集群
##后台啟動kafka,每台都要啟動
nohup /usr/local/kafka_2.12-0.10.2.1/bin/kafka-server-start.sh /usr/local/kafka_2.12-0.10.2.1/config/server.properties &
##查看端口確保:30000和30010或者30020,因為這是zookeeper的端口,這些端口沒有kafka是起不起來的。

4)創建主題驗證生產者與消費者
創建主題為pytest1:
創建命令:/usr/local/kafka_2.12-0.10.2.1/bin/kafka-topics.sh --create --zookeeper 192.168.120.208:30000,192.168.120.207:30000,192.168.120.206:30000 --replication-factor 1 --partitions 1 --topic pytest1 Created topic "pytest1".

參數解釋:
--create ##創建
--zookeeper ##在zookeeper集群,以英文逗號分隔
--replication-factor ##復制多少份
--partitions ##備份多少份
--topic ##主題名稱
啟動一個生產者,多個消費者:
生產者:
./kafka-console-producer.sh --broker-list 192.168.120.207:9092,192.168.120.208:9092,192.168.120.206:9092 --topic pytest1
消費者:
/usr/local/kafka_2.12-0.10.2.1/bin/kafka-console-consumer.sh --bootstrap-server 192.168.120.207:9092,192.168.120.208:9092,192.168.120.206:9092 --topic pytest1 --from-beginning
5)管理kafka集群:
kafka manager 下載地址:https://github.com/yahoo/kafka-manager 可以下載源碼自行編譯。這里使用已經編譯好的包直接搭建。
當然也可以用我這已經編譯好的包。
鏈接:https://pan.baidu.com/s/1RQHARizISqfCGqqe5nz4yg
提取碼:qzgp
解壓kafka-manager unzip kafka-manager-1.3.3.7.zip 移動到統一安裝目錄下 mv kafka-manager-1.3.3.7 /usr/local/ cd /usr/local/kafka-manager-1.3.3.7/ vim application.conf 修改兩個地方: kafka-manager.zkhosts akka 示例如下: kafka-manager.zkhosts="192.168.120.207:30000,192.168.120.208:30000,192.168.120.206:30000" akka { loggers = ["akka.event.slf4j.Slf4jLogger"] loglevel = "INFO" logger-startup-timeout = 30s } 添加:logger-startup-timeout = 30s要不然報錯。 我沒有添加前報錯如下: [WARN] [07/30/2020 15:17:00.493] [main] [EventStream(akka://kafka-manager-system)] Logger log1-Slf4 啟動: ./kafka-manager -Dconfig.file=../conf/application.conf -Dhttp.port=8080 &
訪問:192.168.120.207:8080