作者:bai_nian_min_guo
https://www.cnblogs.com/bainianminguo/p/12247158.html
一、kafka概述
1.1、定義
Kakfa是一個分布式的基於發布/訂閱模式的消息隊列(message queue),主要應用於大數據的實時處理領域
1.2、消息隊列
1.2.1、傳統的消息隊列&新式的消息隊列的模式
上面是傳統的消息隊列,比如一個用戶要注冊信息,當用戶信息寫入數據庫后,后面還有一些其他流程,比如發送短信,則需要等這些流程處理完成后,在返回給用戶
而新式的隊列是,比如一個用戶注冊信息,數據直接丟進數據庫,就直接返回給用戶成功
1.2.2、使用消息隊列的好處
A、 解耦
B、 可恢復性
C、 緩沖
D、 靈活性&峰值處理能力
E、 異步通信
1.2.3、消息隊列的模式
A、點對點模式
消息生產者發送消息到消息隊列中,然后消息消費者從隊列中取出並且消費消息,消息被消費后,隊列中不在存儲。所以消息消費者不可能消費到已經被消費的消息;隊列支持存在多個消費者,但是對於一個消息而言,只會 有一個消費者可以消費;如果想發給多個消費者,則需要多次發送該條消息
B】發布/訂閱模式(一對多,消費者消費數據之后不會清除消息)
消息生產者將消息發布到topic中,同時有多個消息消費者(訂閱)消費該消息,和點對點的方式不同,發布到topic的消息會被所有的訂閱者消費;但是數據保留是期限的,默認是7天,因為他不是存儲系統;kafka就是這種模式的;有兩種方式,一種是是消費者去主動去消費(拉取)消息,而不是生產者推送消息給消費者;另外一種就是生產者主動推送消息給消費者,類似公眾號
1.3、kafka的基礎架構
kafka的基礎架構主要有broker、生產者、消費者組構成,當前還包括zookeeper
生產者負責發送消息
broker負責緩沖消息,broker中可以創建topic,每個topic又有partition和replication的概念
消費者組負責處理消息,同一個消費者組的中消費者不能消費同一個partition中的數據,消費者組主要是提高消費能力,比如之前是一個消費者消費100條數據,現在是2個消費者消費100條數據,可以提高消費能力;所以消費者組的消費者的個數要小於partition的個數,不然就會有消費者沒有partition可以消費,造成資源的浪費
注:但是不同的消費者組的消費者是可以消費相同的partition數據
Kakfa如果要組件集群,則只需要注冊到一個zk中就可以了,zk中還保留消息消費的進度或者說偏移量或者消費位置
0.9版本之前偏移量存儲在zk
0.9版本之后偏移量存儲在kafka中,kafka定義了一個系統的topic,專用用來存儲偏移量的數據;
為什么要改?主要是考慮到頻繁更改偏移量,對zk__的壓力較大,而且kafka__本身自己的處理也較復雜
1.4、kafka安裝
A、Kafka的安裝只需要解壓安裝包就可以完成安裝
tar -zxvf kafka_2.11 -2.1.1.tgz -C /usr/local/
B、查看配置文件
[root@es1 config]# pwd
/usr/local/kafka/config
[root@es1 config]# ll
total 84
-rw-r--r--. 1 root root 906 Feb 8 2019 connect-console-sink.properties
-rw-r--r--. 1 root root 909 Feb 8 2019 connect-console-source.properties
-rw-r--r--. 1 root root 5321 Feb 8 2019 connect-distributed.properties
-rw-r--r--. 1 root root 883 Feb 8 2019 connect-file-sink.properties
-rw-r--r--. 1 root root 881 Feb 8 2019 connect-file-source.properties
-rw-r--r--. 1 root root 1111 Feb 8 2019 connect-log4j.properties
-rw-r--r--. 1 root root 2262 Feb 8 2019 connect-standalone.properties
-rw-r--r--. 1 root root 1221 Feb 8 2019 consumer.properties
-rw-r--r--. 1 root root 4727 Feb 8 2019 log4j.properties
-rw-r--r--. 1 root root 1925 Feb 8 2019 producer.properties
-rw-r--r--. 1 root root 6865 Jan 16 22:00 server-1.properties
-rw-r--r--. 1 root root 6865 Jan 16 22:00 server-2.properties
-rw-r--r--. 1 root root 6873 Jan 16 03:57 server.properties
-rw-r--r--. 1 root root 1032 Feb 8 2019 tools-log4j.properties
-rw-r--r--. 1 root root 1169 Feb 8 2019 trogdor.conf
-rw-r--r--. 1 root root 1023 Feb 8 2019 zookeeper.properties
C、修改配置文件server.properties
設置broker.id 這個是kafka集群區分每個節點的唯一標志符
D、設置kafka的數據存儲路徑
注:這個目錄下不能有其他非kafka的目錄,不然會導致kafka集群無法啟動
E、設置是否可以刪除topic,默認情況先kafka的topic是不允許刪除的
F、Kafka的數據保留的時間,默認是7天
G、Log文件最大的大小,如果log文件超過1g會創建一個新的文件
H、Kafka連接的zk的地址和連接kafka的超時時間
J、默認的partition的個數
推薦閱讀:6個步驟,全方位掌握 Kafka。
1.5、啟動kafka
A、啟動方式1,kafka只能單節點啟動,所以每個kakfa節點都需要手動啟動,下面的方式阻塞的方式啟動
B、啟動方式2,守護的方式啟動,推薦
1.6、kafka操作
A、查看當前kafka集群已有的topic
注意:這里連接的zookeeper,而不是連接的kafka
B、創建topic,指定分片和副本個數
注:
replication-factor:副本數
replication-factor:分區數
Topic:主題名
如果當前kafka集群只有3個broker節點,則replication-factor最大就是3了,下面的例子創建副本為4,則會報錯
C、刪除topic
D、查看topic信息
1.7、啟動生產者生產消息,kafka自帶一個生產者和消費者的客戶端
A、啟動一個生產者,注意此時連的9092端口,連接的kafka集群
B、啟動一個消費者,注意此時連接的還是9092端口,在0.9版本之前連接的還是2181端口
這里我們啟動2個消費者來測試一下
注:如果不指定的消費者組的配置文件的話,默認每個消費者都屬於不同的消費者組
C、發送消息,可以看到每個消費者都能收到消息
D、Kakfa中的實際的數據
二、kafka架構深入
Kafka不能保證消息的全局有序,只能保證消息在partition內有序,因為消費者消費消息是在不同的partition中隨機的
2.1、kafka的工作流程
Kafka中的消息是以topic進行分類的,生產者生成消息,消費者消費消息,都是面向topic的
Topic是一個邏輯上的概念,而partition是物理上的概念
每個partition又有副本的概念
每個partition對應於一個log文件,該log文件中存儲的就是生產者生成的數據,生產者生成的數據會不斷的追加到該log的文件末端,且每條數據都有自己的offset,消費者都會實時記錄自己消費到了那個offset,以便出錯的時候從上次的位置繼續消費,這個offset就保存在index文件中
kafka的offset是分區內有序的,但是在不同分區中是無順序的,kafka不保證數據的全局有序
2.2、kafka原理
由於生產者生產的消息會不斷追加到log文件的末尾,為防止log文件過大導致數據定位效率低下,Kafka采用分片和索引的機制,將每個partition分為多個segment,每個segment對應2個文件----index文件和log文件,這2個文件位於一個相同的文件夾下,文件夾的命名規則為topic名稱+分區序號
Indx和log的文件的文件名是當前這個索引是最小的數據的offset
Kafka如何快速的消費數據呢?
Index文件中存儲的數據的索引信息,第一列是offset,第二列這這個數據所對應的log文件中的偏移量,就像我們去讀文件,使用seek()設置當前鼠標的位置一樣,可以更快的找到數據
如果要去消費offset為3的數據,首先通過二分法找到數據在哪個index文件中,然后在通過index中offset找到數據在log文件中的offset;這樣就可以快速的定位到數據,並消費
所以kakfa雖然把數據存儲在磁盤中,但是他的讀取速度還是非常快的。
關注微信公眾號:Java技術棧,在后台回復:架構,可以獲取我整理的 N 篇架構干貨。
三、kafka的生產者和消費者
3.1、kafka的生產者
Kafka的partition的分區的作用
Kafka的分區的原因主要就是提供並發提高性能,因為讀寫是partition為單位讀寫的;
那生產者發送消息是發送到哪個partition中呢?
A、在客戶端中指定partition
B、輪詢(推薦)消息1去p1,消息2去p2,消息3去p3,消息4去p1,消息5去p2,消息6去p3 。。。。。。。
3.2 kafka如何保證數據可靠性呢?通過ack來保證
為保證生產者發送的數據,能可靠的發送到指定的topic,topic的每個partition收到生產者發送的數據后,都需要向生產者發送ack(確認收到),如果生產者收到ack,就會進行下一輪的發送,否則重新發送數據
那么kafka什么時候向生產者發送ack
確保follower和leader同步完成,leader在發送ack給生產者,這樣才能確保leader掛掉之后,能再follower中選舉出新的leader后,數據不會丟失
那多少個follower同步完成后發送ack
方案1:半數已經完成同步,就發送ack
方案2:全部完成同步,才發送ack(kafka采用這種方式)
采用第二種方案后,設想以下場景,leader收到數據,所有的follower都開始同步數據,但是有一個follower因為某種故障,一直無法完成同步,那leader就要一直等下,直到他同步完成,才能發送ack,這樣就非常影響效率,這個問題怎么解決?
Leader維護了一個動態的ISR列表(同步副本的作用),只需要這個列表的中的follower和leader同步;當ISR中的follower完成數據的同步之后,leader就會給生產者發送ack,如果follower長時間未向leader同步數據,則該follower將被剔除ISR,這個時間閾值也是自定義的;同樣leader故障后,就會從ISR中選舉新的leader
怎么選擇ISR的節點呢?
首先通信的時間要快,要和leader要可以很快的完成通信,這個時間默認是10s
然后就看leader數據差距,消息條數默認是10000條(后面版本被移除)
為什么移除:因為kafka發送消息是批量發送的,所以會一瞬間leader接受完成,但是follower還沒有拉取,所以會頻繁的踢出加入ISR,這個數據會保存到zk和內存中,所以會頻繁的更新zk和內存。
但是對於某些不太重要的數據,對數據的可靠性要求不是很高,能夠容忍數據的少量丟失,所以沒必要等ISR中的follower全部接受成功
所以kafka為用戶提供了三種可靠性級別,用戶可以根據可靠性和延遲進行權衡,這個設置在kafka的生成中設置:acks參數設置
A、acks為0
生產者不等ack,只管往topic丟數據就可以了,這個丟數據的概率非常高
B、ack為1
Leader落盤后就會返回ack,會有數據丟失的現象,如果leader在同步完成后出現故障,則會出現數據丟失
C、ack為-1(all)
Leader和follower(ISR)落盤才會返回ack,會有數據重復現象,如果在leader已經寫完成,且follower同步完成,但是在返回ack的出現故障,則會出現數據重復現象;極限情況下,這個也會有數據丟失的情況,比如follower和leader通信都很慢,所以ISR中只有一個leader節點,這個時候,leader完成落盤,就會返回ack,如果此時leader故障后,就會導致丟失數據
3.3 Kafka如何保證消費數據的一致性?通過HW來保證
LEO:指每個follower的最大的offset
HW(高水位):指消費者能見到的最大的offset,LSR隊列中最小的LEO,也就是說消費者只能看到1~6的數據,后面的數據看不到,也消費不了
避免leader掛掉后,比如當前消費者消費8這條數據后,leader掛 了,此時比如f2成為leader,f2根本就沒有9這條數據,那么消費者就會報錯,所以設計了HW這個參數,只暴露最少的數據給消費者,避免上面的問題
3.3.1、HW保證數據存儲的一致性
A、Follower故障
Follower發生故障后會被臨時提出LSR,待該follower恢復后,follower會讀取本地的磁盤記錄的上次的HW,並將該log文件高於HW的部分截取掉,從HW開始想leader進行同步,等該follower的LEO大於等於該Partition的hw,即follower追上leader后,就可以重新加入LSR
B、Leader故障
Leader發生故障后,會從ISR中選出一個新的leader,之后,為了保證多個副本之間的數據一致性,其余的follower會先將各自的log文件高於hw的部分截掉(新leader自己不會截掉),然后從新的leader同步數據
注意:這個是為了保證多個副本間的數據存儲的一致性,並不能保證數據不丟失或者不重復
3.3.2精准一次(冪等性),保證數據不重復
Ack設置為-1,則可以保證數據不丟失,但是會出現數據重復(at least once)
Ack設置為0,則可以保證數據不重復,但是不能保證數據不丟失(at most once)
但是如果魚和熊掌兼得,該怎么辦?這個時候就就引入了Exactl once(精准一次)
在0.11版本后,引入冪等性解決kakfa集群內部的數據重復,在0.11版本之前,在消費者處自己做處理
如果啟用了冪等性,則ack默認就是-1,kafka就會為每個生產者分配一個pid,並未每條消息分配seqnumber,如果pid、partition、seqnumber三者一樣,則kafka認為是重復數據,就不會落盤保存;但是如果生產者掛掉后,也會出現有數據重復的現象;所以冪等性解決在單次會話的單個分區的數據重復,但是在分區間或者跨會話的是數據重復的是無法解決的
3.4 kafka的消費者
3.4.1 消費方式
消息隊列有兩種消費消息的方式,push(微信公眾號)、pull(kafka),push模式很難適應消費速率不同的消費者,因為消費發送速率是由broker決定的,他的目標是盡可能以最快的的速度傳遞消息,但是這樣很容易造成消費者來不及處理消息,典型的表現就是拒絕服務以及網絡擁塞。而pull的方式可以消費者的消費能力以適當的速率消費消息
Pull的模式不足之處是如果kafka沒有數據,消費者可能會陷入死循環,一直返回空數據,針對這一點,kafka的消費者在消費數據時候回傳遞一個timeout參數,如果當時沒有數據可供消費,消費者會等待一段時間在返回
3.4.2 分區分配策略
一個消費者組有多個消費者,一個topic有多個partition。所以必然會涉及到partition的分配問題,即確定哪個partition由哪個消費者來消費
Kafka提供兩種方式,一種是輪詢(RountRobin)對於topic組生效,一種是(Range)對於單個topic生效
輪訓:前置條件是需要一個消費者里的消費者訂閱的是相同的topic。不然就會出現問題;非默認的的方式
同一個消費者組里的消費者不能同時消費同一個分區
比如三個消費者消費一個topic的9個分區
如果一個消費者組里有2個消費者,這個消費者組里同時消費2個topic,每個topic又有三個partition
首先會把2個topic當做一個主題,然后根據topic和partition做hash,然后在按照hash排序。然后輪訓分配給一個消費者組中的2個消費者
如果是下面這樣的方式訂閱的呢?
比如有3個topic,每個topic有3個partition,一個消費者組中有2個消費者。消費者1訂閱topic1和topic2,消費者2訂閱topic2和topic3,那么這樣的場景,使用輪訓的方式訂閱topic就會有問題
如果是下面這種方式訂閱呢
比如有2個topic,每個topic有3個partition,一個消費者組 有2個消費者,消費者1訂閱topic1,消費者2訂閱topic2,這樣使用輪訓的方式訂閱topic也會有問題
所以我們一直強調,使用輪訓的方式訂閱topic的前提是一個消費者組中的所有消費者訂閱的主題是一樣的;
所以輪訓的方式不是kafka默認的方式
Range:是按照單個topic來划分的,默認的分配方式
Range的問題會出現消費者數據不均衡的問題
比如下面的例子,一個消費者組訂閱了2個topic,就會出現消費者1消費4個partition,而另外一個消費者只消費2個partition
分區策略什么時候會觸發呢?當消費者組里的消費者個數變化的時候,會觸發分區策略調整,比如消費者里增加消費者,或者減少消費者
3.4.3 offset的維護
由於消費者在消費過程中可能會出現斷電宕機等故障,消費者恢復后,需要從故障前的位置繼續消費,所以消費者需要實施記錄自己消費哪個offset,以便故障恢復后繼續消費
Offset保存的位置有2個,一個zk,一個是kafka
首先看下offset保存到zk
由消費者組、topic、partition三個元素確定唯一的offset
所以消費者組中的某個消費者掛掉之后,或者的消費者還是可以拿到這個offset的
Controller這個節點和zk通信,同步數據,這個節點就是誰先起來,誰就先注冊controller,誰就是controller。其他節點和controller信息保持同步
3.4.5、消費者組的案例
修改消費者組id
啟動一個消費者發送3條數據
指定消費者組啟動消費者,啟動三個消費者,可以看到每個消費者消費了一條數據
在演示下不同組可以消費同一個topic的,我們看到2個消費者的消費者都消費到同一條數據
再次啟動一個消費者,這個消費者屬於另外一個消費者組
四、Kafka的高效讀寫機制
4.1、分布式部署
多節點並行操作
4.2、順序寫磁盤
Kafka的producer生產數據,要寫入到log文件中,寫的過程中一直追加到文件末尾,為順序寫,官網有數據表明。同樣的磁盤,順序寫能到600M/S,而隨機寫只有100K/S。這與磁盤的機械結構有關,順序寫之所以快,是因為其省去了大量磁頭尋址的時間
4.3、零復制技術
正常情況下,先把數據讀到內核空間,在從內核空間把數據讀到用戶空間,然后在調操作系統的io接口寫到內核空間,最終在寫到硬盤中
Kafka是這樣做的,直接在內核空間流轉io流,所以kafka的性能非常高
五、 zookeeper在kafka中的作用
Kafka集群中有一個broker會被選舉為controller,負責管理集群broker的上下線,所有的topic的分區副本分配和leader選舉等工作。
推薦去我的博客閱讀更多:
2.Spring MVC、Spring Boot、Spring Cloud 系列教程
3.Maven、Git、Eclipse、Intellij IDEA 系列工具教程
生活很美好,明天見~