基礎架構及術語
話不多說,先看圖,通過這張圖我們來捋一捋相關的概念及之間的關系:
如果看到這張圖你很懵逼,木有關系!我們先來分析相關概念
Producer:Producer即生產者,消息的產生者,是消息的入口。
kafka cluster:
Broker:Broker是kafka實例,每個服務器上有一個或多個kafka的實例,我們姑且認為每個broker對應一台服務器。每個kafka集群內的broker都有一個不重復的編號,如圖中的broker-0、broker-1等……
Topic:消息的主題,可以理解為消息的分類,kafka的數據就保存在topic。在每個broker上都可以創建多個topic。
Partition:Topic的分區,每個topic可以有多個分區,分區的作用是做負載,提高kafka的吞吐量。同一個topic在不同的分區的數據是不重復的,partition的表現形式就是一個一個的文件夾!
Replication:每一個分區都有多個副本,副本的作用是做備胎。當主分區(Leader)故障的時候會選擇一個備胎(Follower)上位,成為Leader。在kafka中默認副本的最大數量是10個,且副本的數量不能大於Broker的數量,follower和leader絕對是在不同的機器,同一機器對同一個分區也只可能存放一個副本(包括自己)。
Message:每一條發送的消息主體。
Consumer:消費者,即消息的消費方,是消息的出口。
Consumer Group:我們可以將多個消費組組成一個消費者組,在kafka的設計中同一個分區的數據只能被消費者組中的某一個消費者消費。同一個消費者組的消費者可以消費同一個topic的不同分區的數據,這也是為了提高kafka的吞吐量!
Zookeeper:kafka集群依賴zookeeper來保存集群的的元信息,來保證系統的可用性。
Kafka通過Zookeeper管理集群配置,選舉leader,以及在Consumer Group發生變化時進行rebalance。Producer使用push模式將消息發布到broker,Consumer使用pull模式從broker訂閱並消費消息。
zookeeper是用來管理broker和consumer的。
broker:當創建broker之后,向zookeeper注冊broker信息。Kafka Broker節點一起去Zookeeper上注冊一個臨時節點,因為只有一個Kafka Broker會注冊成功,其他的都會失敗,所以這個成功在Zookeeper上注冊臨時節點的這個Kafka Broker會成為Kafka Broker Controller,其他的Kafka broker叫Kafka Broker follower。(這個過程叫Controller在ZooKeeper注冊Watch)。
Consumergroup:各個consumer(consumer 線程)可以組成一個組(Consumer group ),partition中的每個message只能被組(Consumer group )中的一個consumer(consumer 線程)消費,如果一個message可以被多個consumer(consumer 線程)消費的話,那么這些consumer必須在不同的組。
Kafka不支持一個partition中的message由兩個或兩個以上的同一個consumer group下的consumer thread來處理,除非再啟動一個新的consumer group。所以如果想同時對一個topic做消費的話,啟動多個consumer group就可以了,但是要注意的是,這里的多個consumer的消費都必須是順序讀取partition里面的message,新啟動的consumer默認從partition隊列最頭端最新的地方開始阻塞的讀message。它不能像AMQ那樣可以多個BET作為consumer去互斥的(for update悲觀鎖)並發處理message,這是因為多個BET去消費一個Queue中的數據的時候,由於要保證不能多個線程拿同一條message,所以就需要行級別悲觀所(for update),這就導致了consumer的性能下降,吞吐量不夠。
而kafka為了保證吞吐量,只允許同一個consumer group下的一個consumer線程去訪問一個partition。如果覺得效率不高的時候,可以加partition的數量來橫向擴展,那么再加新的consumer thread去消費。如果想多個不同的業務都需要這個topic的數據,起多個consumer group就好了,大家都是順序的讀取message,offsite的值互不影響。這樣沒有鎖競爭,充分發揮了橫向的擴展性,吞吐量極高。這也就形成了分布式消費的概念。
當啟動一個consumer group去消費一個topic的時候,無論topic里面有多個少個partition,無論我們consumer group里面配置了多少個consumer thread,這個consumer group下面的所有consumer thread一定會消費全部的partition;即便這個consumer group下只有一個consumer thread,那么這個consumer thread也會去消費所有的partition。因此,最優的設計就是,consumer group下的consumer thread的數量等於partition數量,這樣效率是最高的。
Zookeeper會給每個consumer group一個ID,即同一份數據可以被不同的用戶ID多次消費。因此這就是單播與多播的實現。以單個消費者還是以組別的方式去消費數據,由用戶自己去定義。Zookeeper管理consumer的offset跟蹤當前消費的offset。
再說明一點:zookeeper不管理producer,只是提供當前broker的相關信息,比如topic中的partition信息。在新版本的Kafka中,consumer也不需要zookeeper管理了。
下面用一張圖看一下Kafka的分區:
工作流程分析
上面介紹了kafka的基礎架構及基本概念,不知道大家看完有沒有對kafka有個大致印象,如果對還比較懵也沒關系!我們接下來再結合上面的結構圖分析kafka的工作流程,最后再回來整個梳理一遍我相信你會更有收獲!
發送數據
我們看上面的架構圖中,producer就是生產者,是數據的入口。注意看圖中的紅色箭頭,Producer在寫入數據的時候永遠的找leader,不會直接將數據寫入follower!那leader怎么找呢?寫入的流程又是什么樣的呢?我們看下圖:
發送的流程就在圖中已經說明了,就不單獨在文字列出來了!需要注意的一點是,消息寫入leader后,follower是主動的去leader進行同步的!producer采用push模式將數據發布到broker,每條消息追加到分區中,順序寫入磁盤,所以保證同一分區內的數據是有序的!寫入示意圖如下:
上面說到數據會寫入到不同的分區,那kafka為什么要做分區呢?相信大家應該也能猜到,分區的主要目的是:
1、 方便擴展。因為一個topic可以有多個partition,所以我們可以通過擴展機器去輕松的應對日益增長的數據量。
2、 提高並發。以partition為讀寫單位,可以多個消費者同時消費數據,提高了消息的處理效率。
熟悉負載均衡的朋友應該知道,當我們向某個服務器發送請求的時候,服務端可能會對請求做一個負載,將流量分發到不同的服務器,那在kafka中,如果某個topic有多個partition,producer又怎么知道該將數據發往哪個partition呢?kafka中有幾個原則:
1、 partition在寫入的時候可以指定需要寫入的partition,如果有指定,則寫入對應的partition。
2、 如果沒有指定partition,但是設置了數據的key,則會根據key的值hash出一個partition。
3、 如果既沒指定partition,又沒有設置key,則會輪詢選出一個partition。
保證消息不丟失是一個消息隊列中間件的基本保證,那producer在向kafka寫入消息的時候,怎么保證消息不丟失呢?其實上面的寫入流程圖中有描述出來,那就是通過ACK應答機制!在生產者向隊列寫入數據的時候可以設置參數來確定是否確認kafka接收到數據,這個參數可設置的值為0、1、all。
0代表producer往集群發送數據不需要等到集群的返回,不確保消息發送成功。安全性最低但是效率最高。
1代表producer往集群發送數據只要leader應答就可以發送下一條,只確保leader發送成功。
all代表producer往集群發送數據需要所有的follower都完成從leader的同步才會發送下一條,確保leader發送成功和所有的副本都完成備份。安全性最高,但是效率最低。
最后要注意的是,如果往不存在的topic寫數據,能不能寫入成功呢?kafka會自動創建topic,分區和副本的數量根據默認配置都是1。
保存數據
Producer將數據寫入kafka后,集群就需要對數據進行保存了!kafka將數據保存在磁盤,可能在我們的一般的認知里,寫入磁盤是比較耗時的操作,不適合這種高並發的組件。Kafka初始會單獨開辟一塊磁盤空間,順序寫入數據(效率比隨機寫入高)。
Partition 結構
前面說過了每個topic都可以分為一個或多個partition,如果你覺得topic比較抽象,那partition就是比較具體的東西了!Partition在服務器上的表現形式就是一個一個的文件夾,每個partition的文件夾下面會有多組segment文件,每組segment文件又包含.index文件、.log文件、.timeindex文件(早期版本中沒有)三個文件, log文件就實際是存儲message的地方,而index和timeindex文件為索引文件,用於檢索消息。
如上圖,這個partition有三組segment文件,每個log文件的大小是一樣的,但是存儲的message數量是不一定相等的(每條的message大小不一致)。文件的命名是以該segment最小offset來命名的,如000.index存儲offset為0~368795的消息,kafka就是利用分段+索引的方式來解決查找效率的問題。
Message結構
上面說到log文件就實際是存儲message的地方,我們在producer往kafka寫入的也是一條一條的message,那存儲在log中的message是什么樣子的呢?消息主要包含消息體、消息大小、offset、壓縮類型……等等!我們重點需要知道的是下面三個:
1、 offset:offset是一個占8byte的有序id號,它可以唯一確定每條消息在parition內的位置!
2、 消息大小:消息大小占用4byte,用於描述消息的大小。
3、 消息體:消息體存放的是實際的消息數據(被壓縮過),占用的空間根據具體的消息而不一樣。
存儲策略
無論消息是否被消費,kafka都會保存所有的消息。那對於舊數據有什么刪除策略呢?
1、 基於時間,默認配置是168小時(7天)。
2、 基於大小,默認配置是1073741824。
需要注意的是,kafka讀取特定消息的時間復雜度是O(1),所以這里刪除過期的文件並不會提高kafka的性能!
消費數據
消息存儲在log文件后,消費者就可以進行消費了。與生產消息相同的是,消費者在拉取消息的時候也是找leader去拉取。
多個消費者可以組成一個消費者組(consumer group),每個消費者組都有一個組id!同一個消費組者的消費者可以消費同一topic下不同分區的數據,但是不會組內多個消費者消費同一分區的數據!!!是不是有點繞。我們看下圖:
圖示是消費者組內的消費者小於partition數量的情況,所以會出現某個消費者消費多個partition數據的情況,消費的速度也就不及只處理一個partition的消費者的處理速度!如果是消費者組的消費者多於partition的數量,那會不會出現多個消費者消費同一個partition的數據呢?上面已經提到過不會出現這種情況!多出來的消費者不消費任何partition的數據。所以在實際的應用中,建議消費者組的consumer的數量與partition的數量一致!
在保存數據的小節里面,我們聊到了partition划分為多組segment,每個segment又包含.log、.index、.timeindex文件,存放的每條message包含offset、消息大小、消息體……我們多次提到segment和offset,查找消息的時候是怎么利用segment+offset配合查找的呢?假如現在需要查找一個offset為368801的message是什么樣的過程呢?我們先看看下面的圖:
1、 先找到offset的368801message所在的segment文件(利用二分法查找),這里找到的就是在第二個segment文件。
2、 打開找到的segment中的.index文件(也就是368796.index文件,該文件起始偏移量為368796+1,我們要查找的offset為368801的message在該index內的偏移量為368796+5=368801,所以這里要查找的相對offset為5)。由於該文件采用的是稀疏索引的方式存儲着相對offset及對應message物理偏移量的關系,所以直接找相對offset為5的索引找不到,這里同樣利用二分法查找相對offset小於或者等於指定的相對offset的索引條目中最大的那個相對offset,所以找到的是相對offset為4的這個索引。
3、 根據找到的相對offset為4的索引確定message存儲的物理偏移位置為256。打開數據文件,從位置為256的那個地方開始順序掃描直到找到offset為368801的那條Message。
這套機制是建立在offset為有序的基礎上,利用segment+有序offset+稀疏索引+二分查找+順序查找等多種手段來高效的查找數據!至此,消費者就能拿到需要處理的數據進行處理了。那每個消費者又是怎么記錄自己消費的位置呢?在早期的版本中,消費者將消費到的offset維護zookeeper中,consumer每間隔一段時間上報一次,這里容易導致重復消費,且性能不好!在新的版本中消費者消費到的offset已經直接維護在kafk集群的__consumer_offsets這個topic中!