1. kafka的定義
kafka是一個分布式消息系統,由linkedin使用scala編寫,用作LinkedIn的活動流(Activity Stream)和運營數據處理管道(Pipeline)的基礎。具有高水平擴展和高吞吐量。
2. kafka 和其他主流分布式消息系統的對比
定義解釋:
1. Java 和 scala都是運行在JVM上的語言。
2. erlang和最近比較火的和go語言一樣是從代碼級別就支持高並發的一種語言,所以RabbitMQ天生就有很高的並發性能,但是有RabbitMQ嚴格按照AMQP進行實現,受到了很多限制。kafka的設計目標是高吞吐量,所以kafka自己設計了一套高性能但是不通用的協議,他也是仿照AMQP( Advanced Message Queuing Protocol 高級消息隊列協議)設計的。
3. 事務的概念:在數據庫中,多個操作一起提交,要么操作全部成功,要么全部失敗。舉個例子, 在轉賬的時候付款和收款,就是一個事物的例子,你給一個人轉賬,你轉成功,並且對方正常行收到款項后,這個操作才算成功,有一方失敗,那么這個操作就是失敗的。 對應消在息隊列中,就是多條消息一起發送,要么全部成功,要么全部失敗。3個中只有ActiveMQ支持,這個是因為,RabbitMQ和Kafka為了更高的性能,而放棄了對事物的支持 。
4. 集群:多台服務器組成的整體叫做集群,這個整體對生產者和消費者來說,是透明的。其實對消費系統組成的集群添加一台服務器減少一台服務器對生產者和消費者都是無感之的。
5. 負載均衡,對消息系統來說負載均衡是大量的生產者和消費者向消息系統發出請求消息,系統必須均衡這些請求使得每一台服務器的請求達到平衡,而不是大量的請求,落到某一台或幾台,使得這幾台服務器高負荷或超負荷工作,嚴重情況下會停止服務或宕機。
6. 動態擴容是很多公司要求的技術之一,不支持動態擴容就意味着停止服務,這對很多公司來說是不可以接受的。
最后,kafka的動態擴容是通過 zookeeper 來實現的。
zookeeper是一種在分布式系統中被廣泛用來作為:分布式狀態管理、分布式協調管理、分布式配置管理、和分布式鎖服務的集群。kafka增加和減少服務器都會在zookeeper節點上觸發相應的事件,kafka系統會捕獲這些事件,進行新一輪的負載均衡,客戶端也會捕獲這些事件來進行新一輪的處理。
3. kafka 相關概念
3.1 AMQP協議
Advanced Message Queuing Protocol (高級消息隊列協議)
The Advanced Message Queuing Protocol (AMQP):是一個標准開放的應用層的消息中間件(Message Oriented Middleware)協議。AMQP定義了通過網絡發送的字節流的數據格式。因此兼容性非常好,任何實現AMQP協議的程序都可以和與AMQP協議兼容的其他程序交互,可以很容易做到跨語言,跨平台。
上面說的3種比較流行的消息隊列協議,要么支持AMQP協議,要么借鑒了AMQP協議的思想進行了開發、實現、設計。
3.2 一些基本的概念
(1)消費者(consumer):從消息隊列中請求消息的客戶端應用程序
(2)生產者(producer):向broker發布消息的應用程序
(3)AMQP服務端(broker):用來接收生產者發送的消息並將這些消息路由給服務器中的隊列,便於fafka將生產者發送的消息,動態的添加到磁盤並給每一條消息一個偏移量,所以對於kafka一個broker就是一個應用程序的實例
kafka支持的客戶端語言:Kafka客戶端支持當前大部分主流語言,包括:C、C++、Erlang、Java、.net、perl、PHP、Python、Ruby、Go、Javascript
可以使用以上任何一種語言和kafka服務器進行通信(即辨析自己的consumer從kafka集群訂閱消息也可以自己寫producer程序)
3.3 kafka 架構
生產者生產消息、kafka集群、消費者獲取消息這樣一種架構,如下圖:
kafka集群中的消息,是通過Topic(主題)來進行組織的,如下圖:
一些基本的概念:
1. 主題(Topic):一個主題類似新聞中的體育、娛樂、教育等分類概念,在實際工程中通常一個業務一個主題。
2. 分區(Partition):一個Topic中的消息數據按照多個分區組織,分區是kafka消息隊列組織的最小單位,一個分區可以看作是一個FIFO( First Input First Output的縮寫,先入先出隊列)的隊列。
kafka分區是提高kafka性能的關鍵所在,當你發現你的集群性能不高時,常用手段就是增加Topic的分區,分區里面的消息是按照從新到老的順序進行組織,消費者從隊列頭訂閱消息,生產者從隊列尾添加消息。
工作圖:
備份(Replication):為了保證分布式可靠性,kafka0.8開始對每個分區的數據進行備份(不同的Broker上),防止其中一個Broker宕機造成分區上的數據不可用。
以上基礎知識部分摘抄:https://www.cnblogs.com/luotianshuai/p/5206662.html#autoid-0-0-0
4. 集群的搭建
4.1 基礎環境
3 台服務器
192.168.118.14 server1
192.168.118.15 server2
192.168.118.16 server3
Linux服務器一台、三台、五台,zookeeper集群的工作是超過半數才能對外提供服務。
為什么 zookeeper 集群節點數量要是奇數?
首先需要明確 zookeeper 選舉的規則:leader選舉,要求 可用節點數量 > 總節點數 / 2 注意是 > 不是 ≥
采用奇數個的節點主要是出於兩方面的考慮:
1. 防止由腦裂造成的集群不可用
首先,什么是腦裂?集群的腦裂通常是發生在節點之間通信不可達的情況下,集群會分裂成不同的小集群,小集群各自選出自己的master節點,導致原有的集群出現多個master節點的情況,這就是腦裂。
下面舉例說一下為什么采用奇數台節點,就可以防止由於腦裂造成的服務不可用:
(1) 假如zookeeper集群有 5 個節點,發生了腦裂,腦裂成了A、B兩個小集群:
(a) A : 1個節點 ,B :4個節點 , 或 A、B互換
(b) A : 2個節點, B :3個節點 , 或 A、B互換
可以看出,上面這兩種情況下,A、B中總會有一個小集群滿足 可用節點數量 > 總節點數量/2 。所以zookeeper集群仍然能夠選舉出leader , 仍然能對外提供服務,只不過是有一部分節點失效了而已。
(2) 假如zookeeper集群有4個節點,同樣發生腦裂,腦裂成了A、B兩個小集群:
(a) A:1個節點 , B:3個節點, 或 A、B互換
(b) A:2個節點 , B:2個節點
可以看出,情況(a) 是滿足選舉條件的,與(1)中的例子相同。 但是情況(b) 就不同了,因為A和B都是2個節點,都不滿足 可用節點數量 > 總節點數量/2 的選舉條件, 所以此時zookeeper就徹底不能提供服務了。
綜合上面兩個例子可以看出: 在節點數量是奇數個的情況下, zookeeper集群總能對外提供服務(即使損失了一部分節點);如果節點數量是偶數個,會存在zookeeper集群不能用的可能性(腦裂成兩個均等的子集群的時候)。
在生產環境中,如果zookeeper集群不能提供服務,那將是致命的 , 所以zookeeper集群的節點數一般采用奇數個。
2. 在容錯能力相同的情況下,奇數台更節省資源
leader選舉,要求 可用節點數量 > 總節點數量/2 。注意 是 > , 不是 ≥。
舉兩個例子:
(1) 假如zookeeper集群1 ,有3個節點,3/2=1.5 , 即zookeeper想要正常對外提供服務(即leader選舉成功),至少需要2個節點是正常的。換句話說,3個節點的zookeeper集群,允許有一個節點宕機。
(2) 假如zookeeper集群2,有4個節點,4/2=2 , 即zookeeper想要正常對外提供服務(即leader選舉成功),至少需要3個節點是正常的。換句話說,4個節點的zookeeper集群,也允許有一個節點宕機。
那么問題就來了, 集群1與集群2都有 允許1個節點宕機 的容錯能力,但是集群2比集群1多了1個節點。在相同容錯能力的情況下,本着節約資源的原則,zookeeper集群的節點數維持奇數個更好一些。
4.2 zookeeper 集群搭建
服務器默認關閉 selinux 和 防火牆,在 kafka_2.12-1.0.1 中,已經集成了 zookeeper,不需要單獨安裝 zookeeper
(1)首先需要安裝 jdk
三個節點都需要安裝 jdk 支持:
[root@node1 ~]# tar xf jdk-8u77-linux-x64.tar.gz -C /usr/local/ [root@node1 ~]# vim /etc/profile # 在文件追加以下信息 JAVA_HOME=/usr/local/jdk1.8.0_77 JAVA_BIN=$JAVA_HOME/bin PATH=$PATH:$JAVA_BIN CLASSPATH=$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar export JAVA_HOME JAVA_BIN PATH CLASSPATH [root@node1 ~]# ln -vs /usr/local/jdk1.8.0_77/bin/java /usr/bin/
(2)配置 zookeeper 集群
[root@node1 ~]# mkdir -pv /opt/kafka/{zkdata,zkdatalogs} mkdir: created directory ‘/opt/kafka’ # kafka 安裝程序主目錄 mkdir: created directory ‘/opt/kafka/zkdata’ # zookeeper 存放快照日志 mkdir: created directory ‘/opt/kafka/zkdatalogs’ # zookeeper 存放事務日志 [root@node1 ~]# tar xf kafka_2.12-1.0.1.tgz -C /opt/kafka/ [root@node1 ~]# cd /opt/kafka/kafka_2.12-1.0.1/config/ [root@node1 /opt/kafka/kafka_2.12-1.0.1/config]# egrep ^[a-z] zookeeper.properties dataDir=/opt/kafka/zkdata dataLogDir=/opt/kafka/zkdatalogs clientPort=2181 maxClientCnxns=100 tickTime=2000 initLimit=10 syncLimit=5 server.1=192.168.118.14:2888:3888 server.2=192.168.118.15:2888:3888 server.3=192.168.118.16:2888:3888 # server.1 這個 1 是服務器的標識也可以是其他的數字, 表示這個是第幾號服務器,用來標識服務器,這個標識要寫到快照目錄下面myid文件里 #192.168.118.16為集群里的IP地址,第一個端口是master和slave之間的通信端口,默認是2888,第二個端口是leader選舉的端口,集群剛啟動的時候選舉或者leader掛掉之后進行新的選舉的端口默認是3888

dataDir: 快照日志的存儲路徑 dataLogDir: 事物日志的存儲路徑,如果不配置這個那么事物日志會默認存儲到dataDir制定的目錄,這樣會嚴重影響zk的性能,當zk吞吐量較大的時候,產生的事物日志、快照日志太多 clientPort: 這個端口就是客戶端連接 Zookeeper 服務器的端口,Zookeeper 會監聽這個端口,接受客戶端的訪問請求。 maxClientCnxns: 客戶端最大連接數 tickTime: 這個時間是作為 Zookeeper 服務器之間或客戶端與服務器之間維持心跳的時間間隔,也就是每個 tickTime 時間就會發送一個心跳。 initLimit: 這個配置項是用來配置 Zookeeper 接受客戶端(這里所說的客戶端不是用戶連接 Zookeeper 服務器的客戶端,而是 Zookeeper 服務器集群中連接到 Leader 的 Follower 服務器)初始化連接時最長能忍受多少個心跳時間間隔數。當已經超過 5個心跳的時間(也就是 tickTime)長度后 Zookeeper 服務器還沒有收到客戶端的返回信息,那么表明這個客戶端連接失敗。總的時間長度就是 5*2000=10 秒 syncLimit: 這個配置項標識 Leader 與Follower 之間發送消息,請求和應答時間長度,最長不能超過多少個 tickTime 的時間長度,總的時間長度就是5*2000=10秒
創建 myid 文件(切記不能忘記這個步驟)
myid 文件對應:
server.1=192.168.118.14:2888:3888
server.2=192.168.118.15:2888:3888
server.3=192.168.118.16:2888:3888
# node1 echo 1 > /opt/kafka/zkdata/myid # node2 echo 2 > /opt/kafka/zkdata/myid # node3 echo 3 > /opt/kafka/zkdata/myid
三個節點,配置相同,zookeeper 配置唯一不同點就是 myid
啟動服務(三個節點都需要啟動)
[root@node1 ~]# cd /opt/kafka/kafka_2.12-1.0.1/bin/ [root@node1 /opt/kafka/kafka_2.12-1.0.1/bin]# ./zookeeper-server-start.sh -daemon ../config/zookeeper.properties [root@node1 /opt/kafka/kafka_2.12-1.0.1/bin]# jps -m 11650 QuorumPeerMain ../config/zookeeper.properties 11679 Jps -m [root@node1 /opt/kafka/kafka_2.12-1.0.1/bin]# netstat -ntplu | egrep java tcp6 0 0 192.168.118.14:3888 :::* LISTEN 11650/java tcp6 0 0 :::38269 :::* LISTEN 11650/java tcp6 0 0 :::2181 :::* LISTEN 11650/java
查看集群狀態:
# node1 [root@node1 ~]# echo status | nc localhost 2181 Zookeeper version: 3.4.10-39d3a4f269333c922ed3db283be479f9deacaa0f, built on 03/23/2017 10:13 GMT Clients: /0:0:0:0:0:0:0:1:50099[0](queued=0,recved=1,sent=0) Latency min/avg/max: 0/0/0 Received: 2 Sent: 1 Connections: 1 Outstanding: 0 Zxid: 0x0 Mode: follower Node count: 4 # node2 [root@node2 ~]# echo status | nc localhost 2181 Zookeeper version: 3.4.10-39d3a4f269333c922ed3db283be479f9deacaa0f, built on 03/23/2017 10:13 GMT Clients: /0:0:0:0:0:0:0:1:60588[0](queued=0,recved=1,sent=0) Latency min/avg/max: 0/0/0 Received: 2 Sent: 1 Connections: 1 Outstanding: 0 Zxid: 0x100000000 Mode: leader Node count: 4 # node3 [root@node3 ~]# echo status | nc localhost 2181 Zookeeper version: 3.4.10-39d3a4f269333c922ed3db283be479f9deacaa0f, built on 03/23/2017 10:13 GMT Clients: /0:0:0:0:0:0:0:1:40457[0](queued=0,recved=1,sent=0) Latency min/avg/max: 0/0/0 Received: 2 Sent: 1 Connections: 1 Outstanding: 0 Zxid: 0x100000000 Mode: follower Node count: 4
目前 node2 是 leader 節點
4.3 Kafka 集群搭建
首先創建 kafka 日志目錄,后面配置會用到
[root@node2 ~]# mkdir -pv /opt/kafka/kafka-logs mkdir: created directory ‘/opt/kafka/kafka-logs’
kafka 配置文件如下(綠色部分是需要修改的):

broker.id=0 #當前機器在集群中的唯一標識,和zookeeper的myid性質一樣 listeners=PLAINTEXT://192.168.118.14:9092 #當前kafka對外提供服務的端口默認是9092 num.network.threads=3 #這個是borker進行網絡處理的線程數 num.io.threads=8 #這個是borker進行I/O處理的線程數 socket.send.buffer.bytes=102400 #發送緩沖區buffer大小,數據不是一下子就發送的,先回存儲到緩沖區了到達一定的大小后在發送,能提高性能 socket.receive.buffer.bytes=102400 #kafka接收緩沖區大小,當數據到達一定大小后在序列化到磁盤 socket.request.max.bytes=104857600 #這個參數是向kafka請求消息或者向kafka發送消息的請請求的最大數,這個值不能超過java的堆棧大小 log.dirs=/opt/kafka/kafka_2.12-1.0.1/kafka-logs #消息存放的目錄,這個目錄可以配置為“,”逗號分割的表達式,上面的num.io.threads要大於這個目錄的個數這個目錄,如果配置多個目錄,新創建的topic他把消息持久化的地方是,當前以逗號分割的目錄中,那個分區數最少就放那一個 num.partitions=3 #默認的分區數,一個topic默認1個分區數 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=2 #kafka保存消息的副本數,如果一個副本失效了,另一個還可以繼續提供服務 transaction.state.log.replication.factor=2 transaction.state.log.min.isr=1 log.retention.hours=168 #默認消息的最大持久化時間,168小時,7天 log.segment.bytes=1073741824 #這個參數是:因為kafka的消息是以追加的形式落地到文件,當超過這個值的時候,kafka會新起一個文件 log.retention.check.interval.ms=300000 #每隔300000毫秒去檢查上面配置的log失效時間(log.retention.hours=168 ),到目錄查看是否有過期的消息如果有,刪除 zookeeper.connect=192.168.118.14:2181,192.168.118.15:2181,192.168.118.16:2181 #設置zookeeper的連接端口 zookeeper.connection.timeout.ms=6000 # zookeeper leader切換連接時間,默認 6秒 group.initial.rebalance.delay.ms=0
啟動 kafka 集群並測試
1. 啟動服務(3個節點都需要啟動)
[root@node1 /opt/kafka/kafka_2.12-1.0.1/bin]# ./kafka-server-start.sh -daemon ../config/server.properties [root@node1 /opt/kafka/kafka_2.12-1.0.1/bin]# jps -m 21088 Jps -m 11650 QuorumPeerMain ../config/zookeeper.properties 21020 Kafka ../config/server.properties
2. 創建 topic 來驗證是否創建成功
創建 topic [root@node1 /opt/kafka/kafka_2.12-1.0.1/bin]# ./kafka-topics.sh --create --zookeeper 192.168.118.14:2181 --replication-factor 3 --partitions 3 --topic superman Created topic "superman". # 解釋 --replication-factor 2 #復制3份 --partitions 1 #創建3個分區 --topic #主題為 superman 在一台服務器上創建一個發布者 [root@node1 /opt/kafka/kafka_2.12-1.0.1/bin]# ./kafka-console-producer.sh --broker-list 192.168.118.14:9092 --topic superman 在一台服務器上創建一個消費者 [root@node1 /opt/kafka/kafka_2.12-1.0.1/bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.118.14:9092 --topic superman --from-beginning
測試如下圖:
其他命令
查看 topic
[root@node1 /opt/kafka/kafka_2.12-1.0.1/bin]# ./kafka-topics.sh --list --zookeeper localhost:2181 __consumer_offsets superman
查看 topic 狀態
[root@node1 /opt/kafka/kafka_2.12-1.0.1/bin]# ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic superman Topic:superman PartitionCount:3 ReplicationFactor:3 Configs: Topic: superman Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0 Topic: superman Partition: 1 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1 Topic: superman Partition: 2 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
5. kafka 監控工具
這里記錄兩種監控工具的搭建:
1. kafka manager
2. kafkaoffsetmonitor
kafka manager 主要用來管理 kafka 集群,kafkaoffsetmonitor 主要用來實時監控消費者信息。
5.1 kafka manager 安裝
kafka manager 下載地址:https://github.com/yahoo/kafka-manager 可以下載源碼自行編譯。這里使用已經編譯好的包直接搭建。
注意:上面下載的是源碼,下載后需要按照后面步驟進行編譯。如果覺得麻煩,可以直接從下面地址下載編譯好的 kafka-manager-1.3.3.7.zip。 鏈接:https://pan.baidu.com/s/1qYifoa4 密碼:el4o
(1)首先安裝 jdk
[root@192.168.118.17 ~]#tar xf jdk-8u77-linux-x64.tar.gz -C /usr/local/ [root@192.168.118.17 ~]#vim /etc/profile # 追加如下: JAVA_HOME=/usr/local/jdk1.8.0_77 JAVA_BIN=$JAVA_HOME/bin PATH=$PATH:$JAVA_BIN CLASSPATH=$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar export JAVA_HOME JAVA_BIN PATH CLASSPATH [root@192.168.118.17 ~]#source /etc/profile [root@192.168.118.17 ~]#ln -vs /usr/local/jdk1.8.0_77/bin/java /usr/bin/ ‘/usr/bin/java’ -> ‘/usr/local/jdk1.8.0_77/bin/java’
(2)解壓配置 kafka manager
修改 kafka-manager.zkhosts
嘗試啟動 kafka manager
[root@192.168.118.17 /opt/kafka-manager-1.3.3.7/conf]#cd ../bin/ [root@192.168.118.17 /opt/kafka-manager-1.3.3.7/bin]#./kafka-manager -Dconfig.file=../conf/application.conf -Dhttp.port=8080 &
啟動成功。
瀏覽器訪問:
(3)配置 kafka manager
點擊保存后,如果kafka 沒有開啟 JMX_PORT 會出現如下 kafka manager 日志報錯信息:
[error] k.m.a.c.BrokerViewCacheActor - Failed to get broker topic segment metrics for BrokerIdentity(1,192.168.118.15,9092,-1,false) java.lang.IllegalArgumentException: requirement failed: No jmx port but jmx polling enabled!
解決辦法:
修改 kafka-server-start.sh
增加 JMX 的端口信息
修改kafka-run-class.sh
增加綠色部分,注意對應的 ip地址
三個節點都需要修改,修改完畢重啟 kafka 服務, kafka manager 如下:
5.2 KafkaOffsetMonitor
KafkaOffsetMonitor 下載地址:https://github.com/quantifind/KafkaOffsetMonitor/releases 建議使用 v0.2.0版本,實測 v0.2.1版本存在BUG
KafkaOffsetMonitor 的使用很簡單,下載下來直接啟動就OK
[root@192.168.118.17 /opt/kafkaoffsetmonitor]#ls kafka-monitor-start.sh KafkaOffsetMonitor-assembly-0.2.0.jar [root@192.168.118.17 /opt/kafkaoffsetmonitor]#cat kafka-monitor-start.sh nohup java -cp KafkaOffsetMonitor-assembly-0.2.0.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb \ --zk 192.168.118.14:2181,192.168.118.15:2181,192.168.118.16:2181 --port 8088 --refresh 5.minutes --retain 1.day &
kafka-monitor-start.sh 為啟動腳本,需要手動填寫,參數解釋如下:
參數說明:
zk :zookeeper主機地址,如果有多個,用逗號隔開
port :應用程序端口
refresh :應用程序在數據庫中刷新和存儲點的頻率
retain :在db中保留多長時間
dbName :保存的數據庫文件名,默認為offsetapp
啟動服務
[root@192.168.118.17 /opt/kafkaoffsetmonitor]#chmod +x kafka-monitor-start.sh [root@192.168.118.17 /opt/kafkaoffsetmonitor]#./kafka-monitor-start.sh [root@192.168.118.17 /opt/kafkaoffsetmonitor]#nohup: appending output to ‘nohup.out’ [root@192.168.118.17 /opt/kafkaoffsetmonitor]#lsof -i :8088 COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME java 20884 root 6u IPv6 40838 0t0 TCP *:radan-http (LISTEN)
配置安裝成功。