參考:
https://www.cnblogs.com/sujing/p/10960832.html
https://blog.csdn.net/suifeng3051/article/details/48053965
https://blog.csdn.net/u013573133/article/details/48142677
再過半小時,你就能明白kafka的工作原理了
本文在個人技術博客不同步發布,詳情可猛戳
亦可掃描屏幕右側二維碼關注個人公眾號,公眾號內有個人聯系方式,等你來撩...
為什么需要消息隊列
周末無聊刷着手機,某寶網APP突然蹦出來一條消息“為了回饋老客戶,女朋友買一送一,活動僅限今天!”。買一送一還有這種好事,那我可不能錯過!忍不住立馬點了去。於是選了兩個最新款,下單、支付一氣呵成!滿足的躺在床上,想着馬上有女朋友了,竟然幸福的失眠了……
第二天正常上着班,突然接到快遞小哥的電話:
小哥:“你是xx嗎?你的女朋友到了,我現在在你樓下,你來拿一下吧!”。
我:“這……我在上班呢,可以晚上送過來嗎?“。
小哥:“晚上可不行哦,晚上我也下班了呢!”。
於是兩個人僵持了很久……
最后小哥說,要不我幫你放到樓下小芳便利店吧,你晚上下班了過來拿,尷尬的局面這才得以緩解!
回到正題,如果沒有小芳便利店,那快遞小哥和我的交互圖就應該如下:
會出現什么情況呢?
1、為了這個女朋友,我請假回去拿(老板不批)。
2、小哥一直在你樓下等(小哥還有其他的快遞要送)。
3、周末再送(顯然等不及)。
4、這個女朋友我不要了(絕對不可能)!
小芳便利店出現后,交互圖就應如下:
在上面例子中,“快遞小哥”和“買女朋友的我”就是需要交互的兩個系統,小芳便利店就是我們本文要講的-“消息中間件”。總結下來小芳便利店(消息中間件)出現后有如下好處:
1、 解耦
快遞小哥手上有很多快遞需要送,他每次都需要先電話一一確認收貨人是否有空、哪個時間段有空,然后再確定好送貨的方案。這樣完全依賴收貨人了!如果快遞一多,快遞小哥估計的忙瘋了……如果有了便利店,快遞小哥只需要將同一個小區的快遞放在同一個便利店,然后通知收貨人來取貨就可以了,這時候快遞小哥和收貨人就實現了解耦!
2、 異步
快遞小哥打電話給我后需要一直在你樓下等着,直到我拿走你的快遞他才能去送其他人的。快遞小哥將快遞放在小芳便利店后,又可以干其他的活兒去了,不需要等待你到來而一直處於等待狀態。提高了工作的效率。
3、 削峰
假設雙十一我買了不同店里的各種商品,而恰巧這些店發貨的快遞都不一樣,有中通、圓通、申通、各種通等……更巧的是他們都同時到貨了!中通的小哥打來電話叫我去北門取快遞、圓通小哥叫我去南門、申通小哥叫我去東門。我一時手忙腳亂……
我們能看到在系統需要交互的場景中,使用消息隊列中間件真的是好處多多,基於這種思路,就有了豐巢、菜鳥驛站等比小芳便利店更專業的“中間件”了。
最后,上面的故事純屬虛構……
消息隊列通信的模式
通過上面的例子我們引出了消息中間件,並且介紹了消息隊列出現后的好處,這里就需要介紹消息隊列通信的兩種模式了:
一、 點對點模式
如上圖所示,點對點模式通常是基於拉取或者輪詢的消息傳送模型,這個模型的特點是發送到隊列的消息被一個且只有一個消費者進行處理。生產者將消息放入消息隊列后,由消費者主動的去拉取消息進行消費。點對點模型的的優點是消費者拉取消息的頻率可以由自己控制。但是消息隊列是否有消息需要消費,在消費者端無法感知,所以在消費者端需要額外的線程去監控。
二、 發布訂閱模式
如上圖所示,發布訂閱模式是一個基於消息送的消息傳送模型,改模型可以有多種不同的訂閱者。生產者將消息放入消息隊列后,隊列會將消息推送給訂閱過該類消息的消費者(類似微信公眾號)。由於是消費者被動接收推送,所以無需感知消息隊列是否有待消費的消息!但是consumer1、consumer2、consumer3由於機器性能不一樣,所以處理消息的能力也會不一樣,但消息隊列卻無法感知消費者消費的速度!所以推送的速度成了發布訂閱模模式的一個問題!假設三個消費者處理速度分別是8M/s、5M/s、2M/s,如果隊列推送的速度為5M/s,則consumer3無法承受!如果隊列推送的速度為2M/s,則consumer1、consumer2會出現資源的極大浪費!
Kafka
上面簡單的介紹了為什么需要消息隊列以及消息隊列通信的兩種模式,接下來就到了我們本文的主角——kafka閃亮登場的時候了!Kafka是一種高吞吐量的分布式發布訂閱消息系統,它可以處理消費者規模的網站中的所有動作流數據,具有高性能、持久化、多副本備份、橫向擴展能力……… 一些基本的介紹這里就不展開了,網上有太多關於這些的介紹了,讀者可以自行百度一下!
基礎架構及術語
話不多說,先看圖,通過這張圖我們來捋一捋相關的概念及之間的關系:
如果看到這張圖你很懵逼,木有關系!我們先來分析相關概念
Producer:Producer即生產者,消息的產生者,是消息的入口。
kafka cluster:
Broker:Broker是kafka實例,每個服務器上有一個或多個kafka的實例,我們姑且認為每個broker對應一台服務器。每個kafka集群內的broker都有一個不重復的編號,如圖中的broker-0、broker-1等……
Topic:消息的主題,可以理解為消息的分類,kafka的數據就保存在topic。在每個broker上都可以創建多個topic。
Partition:Topic的分區,每個topic可以有多個分區,分區的作用是做負載,提高kafka的吞吐量。同一個topic在不同的分區的數據是不重復的,partition的表現形式就是一個一個的文件夾!
Replication:每一個分區都有多個副本,副本的作用是做備胎。當主分區(Leader)故障的時候會選擇一個備胎(Follower)上位,成為Leader。在kafka中默認副本的最大數量是10個,且副本的數量不能大於Broker的數量,follower和leader絕對是在不同的機器,同一機器對同一個分區也只可能存放一個副本(包括自己)。
Message:每一條發送的消息主體。
Consumer:消費者,即消息的消費方,是消息的出口。
Consumer Group:我們可以將多個消費組組成一個消費者組,在kafka的設計中同一個分區的數據只能被消費者組中的某一個消費者消費。同一個消費者組的消費者可以消費同一個topic的不同分區的數據,這也是為了提高kafka的吞吐量!
Zookeeper:kafka集群依賴zookeeper來保存集群的的元信息,來保證系統的可用性。
工作流程分析
上面介紹了kafka的基礎架構及基本概念,不知道大家看完有沒有對kafka有個大致印象,如果對還比較懵也沒關系!我們接下來再結合上面的結構圖分析kafka的工作流程,最后再回來整個梳理一遍我相信你會更有收獲!
發送數據
我們看上面的架構圖中,producer就是生產者,是數據的入口。注意看圖中的紅色箭頭,Producer在寫入數據的時候永遠的找leader,不會直接將數據寫入follower!那leader怎么找呢?寫入的流程又是什么樣的呢?我們看下圖:
發送的流程就在圖中已經說明了,就不單獨在文字列出來了!需要注意的一點是,消息寫入leader后,follower是主動的去leader進行同步的!producer采用push模式將數據發布到broker,每條消息追加到分區中,順序寫入磁盤,所以保證同一分區內的數據是有序的!寫入示意圖如下:
上面說到數據會寫入到不同的分區,那kafka為什么要做分區呢?相信大家應該也能猜到,分區的主要目的是:
1、 方便擴展。因為一個topic可以有多個partition,所以我們可以通過擴展機器去輕松的應對日益增長的數據量。
2、 提高並發。以partition為讀寫單位,可以多個消費者同時消費數據,提高了消息的處理效率。
熟悉負載均衡的朋友應該知道,當我們向某個服務器發送請求的時候,服務端可能會對請求做一個負載,將流量分發到不同的服務器,那在kafka中,如果某個topic有多個partition,producer又怎么知道該將數據發往哪個partition呢?kafka中有幾個原則:
1、 partition在寫入的時候可以指定需要寫入的partition,如果有指定,則寫入對應的partition。
2、 如果沒有指定partition,但是設置了數據的key,則會根據key的值hash出一個partition。
3、 如果既沒指定partition,又沒有設置key,則會輪詢選出一個partition。
保證消息不丟失是一個消息隊列中間件的基本保證,那producer在向kafka寫入消息的時候,怎么保證消息不丟失呢?其實上面的寫入流程圖中有描述出來,那就是通過ACK應答機制!在生產者向隊列寫入數據的時候可以設置參數來確定是否確認kafka接收到數據,這個參數可設置的值為0、1、all。
0代表producer往集群發送數據不需要等到集群的返回,不確保消息發送成功。安全性最低但是效率最高。
1代表producer往集群發送數據只要leader應答就可以發送下一條,只確保leader發送成功。
all代表producer往集群發送數據需要所有的follower都完成從leader的同步才會發送下一條,確保leader發送成功和所有的副本都完成備份。安全性最高,但是效率最低。
最后要注意的是,如果往不存在的topic寫數據,能不能寫入成功呢?kafka會自動創建topic,分區和副本的數量根據默認配置都是1。
保存數據
Producer將數據寫入kafka后,集群就需要對數據進行保存了!kafka將數據保存在磁盤,可能在我們的一般的認知里,寫入磁盤是比較耗時的操作,不適合這種高並發的組件。Kafka初始會單獨開辟一塊磁盤空間,順序寫入數據(效率比隨機寫入高)。
Partition 結構
前面說過了每個topic都可以分為一個或多個partition,如果你覺得topic比較抽象,那partition就是比較具體的東西了!Partition在服務器上的表現形式就是一個一個的文件夾,每個partition的文件夾下面會有多組segment文件,每組segment文件又包含.index文件、.log文件、.timeindex文件(早期版本中沒有)三個文件, log文件就實際是存儲message的地方,而index和timeindex文件為索引文件,用於檢索消息。
如上圖,這個partition有三組segment文件,每個log文件的大小是一樣的,但是存儲的message數量是不一定相等的(每條的message大小不一致)。文件的命名是以該segment最小offset來命名的,如000.index存儲offset為0~368795的消息,kafka就是利用分段+索引的方式來解決查找效率的問題。
Message結構
上面說到log文件就實際是存儲message的地方,我們在producer往kafka寫入的也是一條一條的message,那存儲在log中的message是什么樣子的呢?消息主要包含消息體、消息大小、offset、壓縮類型……等等!我們重點需要知道的是下面三個:
1、 offset:offset是一個占8byte的有序id號,它可以唯一確定每條消息在parition內的位置!
2、 消息大小:消息大小占用4byte,用於描述消息的大小。
3、 消息體:消息體存放的是實際的消息數據(被壓縮過),占用的空間根據具體的消息而不一樣。
存儲策略
無論消息是否被消費,kafka都會保存所有的消息。那對於舊數據有什么刪除策略呢?
1、 基於時間,默認配置是168小時(7天)。
2、 基於大小,默認配置是1073741824。
需要注意的是,kafka讀取特定消息的時間復雜度是O(1),所以這里刪除過期的文件並不會提高kafka的性能!
消費數據
消息存儲在log文件后,消費者就可以進行消費了。與生產消息相同的是,消費者在拉取消息的時候也是找leader去拉取。
多個消費者可以組成一個消費者組(consumer group),每個消費者組都有一個組id!同一個消費組者的消費者可以消費同一topic下不同分區的數據,但是不會組內多個消費者消費同一分區的數據!!!是不是有點繞。我們看下圖:
圖示是消費者組內的消費者小於partition數量的情況,所以會出現某個消費者消費多個partition數據的情況,消費的速度也就不及只處理一個partition的消費者的處理速度!如果是消費者組的消費者多於partition的數量,那會不會出現多個消費者消費同一個partition的數據呢?上面已經提到過不會出現這種情況!多出來的消費者不消費任何partition的數據。所以在實際的應用中,建議消費者組的consumer的數量與partition的數量一致!
在保存數據的小節里面,我們聊到了partition划分為多組segment,每個segment又包含.log、.index、.timeindex文件,存放的每條message包含offset、消息大小、消息體……我們多次提到segment和offset,查找消息的時候是怎么利用segment+offset配合查找的呢?假如現在需要查找一個offset為368801的message是什么樣的過程呢?我們先看看下面的圖:
1、 先找到offset的368801message所在的segment文件(利用二分法查找),這里找到的就是在第二個segment文件。
2、 打開找到的segment中的.index文件(也就是368796.index文件,該文件起始偏移量為368796+1,我們要查找的offset為368801的message在該index內的偏移量為368796+5=368801,所以這里要查找的相對offset為5)。由於該文件采用的是稀疏索引的方式存儲着相對offset及對應message物理偏移量的關系,所以直接找相對offset為5的索引找不到,這里同樣利用二分法查找相對offset小於或者等於指定的相對offset的索引條目中最大的那個相對offset,所以找到的是相對offset為4的這個索引。
3、 根據找到的相對offset為4的索引確定message存儲的物理偏移位置為256。打開數據文件,從位置為256的那個地方開始順序掃描直到找到offset為368801的那條Message。
這套機制是建立在offset為有序的基礎上,利用segment+有序offset+稀疏索引+二分查找+順序查找等多種手段來高效的查找數據!至此,消費者就能拿到需要處理的數據進行處理了。那每個消費者又是怎么記錄自己消費的位置呢?在早期的版本中,消費者將消費到的offset維護zookeeper中,consumer每間隔一段時間上報一次,這里容易導致重復消費,且性能不好!在新的版本中消費者消費到的offset已經直接維護在kafk集群的__consumer_offsets這個topic中!
最后,推薦極客時間上的一門不錯的架構師課程,走向架構師必備!
Kafka 設計與原理詳解
一、Kafka簡介
本文綜合了我之前寫的kafka相關文章,可作為一個全面了解學習kafka的培訓學習資料。
- 1
轉載請注明出處 : 本文鏈接
1.1 背景歷史
當今社會各種應用系統諸如商業、社交、搜索、瀏覽等像信息工廠一樣不斷的生產出各種信息,在大數據時代,我們面臨如下幾個挑戰:
- 如何收集這些巨大的信息
- 如何分析它
- 如何及時做到如上兩點
以上幾個挑戰形成了一個業務需求模型,即生產者生產(produce)各種信息,消費者消費(consume)(處理分析)這些信息,而在生產者與消費者之間,需要一個溝通兩者的橋梁-消息系統。從一個微觀層面來說,這種需求也可理解為不同的系統之間如何傳遞消息。
1.2 Kafka誕生
Kafka由 linked-in 開源
kafka-即是解決上述這類問題的一個框架,它實現了生產者和消費者之間的無縫連接。
kafka-高產出的分布式消息系統(A high-throughput distributed messaging system)
1.3 Kafka現在
Apache kafka 是一個分布式的基於push-subscribe的消息系統,它具備快速、可擴展、可持久化的特點。它現在是Apache旗下的一個開源系統,作為hadoop生態系統的一部分,被各種商業公司廣泛應用。它的最大的特性就是可以實時的處理大量數據以滿足各種需求場景:比如基於hadoop的批處理系統、低延遲的實時系統、storm/spark流式處理引擎。
二、Kafka技術概覽
2.1 Kafka的特性
- 高吞吐量、低延遲:kafka每秒可以處理幾十萬條消息,它的延遲最低只有幾毫秒
- 可擴展性:kafka集群支持熱擴展
- 持久性、可靠性:消息被持久化到本地磁盤,並且支持數據備份防止數據丟失
- 容錯性:允許集群中節點失敗(若副本數量為n,則允許n-1個節點失敗)
- 高並發:支持數千個客戶端同時讀寫
2.2 Kafka一些重要設計思想
下面介紹先大體介紹一下Kafka的主要設計思想,可以讓相關人員在短時間內了解到kafka相關特性,如果想深入研究,后面會對其中每一個特性都做詳細介紹。
- Consumergroup:各個consumer可以組成一個組,每個消息只能被組中的一個consumer消費,如果一個消息可以被多個consumer消費的話,那么這些consumer必須在不同的組。
- 消息狀態:在Kafka中,消息的狀態被保存在consumer中,broker不會關心哪個消息被消費了被誰消費了,只記錄一個offset值(指向partition中下一個要被消費的消息位置),這就意味着如果consumer處理不好的話,broker上的一個消息可能會被消費多次。
- 消息持久化:Kafka中會把消息持久化到本地文件系統中,並且保持極高的效率。
- 消息有效期:Kafka會長久保留其中的消息,以便consumer可以多次消費,當然其中很多細節是可配置的。
- 批量發送:Kafka支持以消息集合為單位進行批量發送,以提高push效率。
- push-and-pull : Kafka中的Producer和consumer采用的是push-and-pull模式,即Producer只管向broker push消息,consumer只管從broker pull消息,兩者對消息的生產和消費是異步的。
- Kafka集群中broker之間的關系:不是主從關系,各個broker在集群中地位一樣,我們可以隨意的增加或刪除任何一個broker節點。
- 負載均衡方面: Kafka提供了一個 metadata API來管理broker之間的負載(對Kafka0.8.x而言,對於0.7.x主要靠zookeeper來實現負載均衡)。
- 同步異步:Producer采用異步push方式,極大提高Kafka系統的吞吐率(可以通過參數控制是采用同步還是異步方式)。
- 分區機制partition:Kafka的broker端支持消息分區,Producer可以決定把消息發到哪個分區,在一個分區中消息的順序就是Producer發送消息的順序,一個主題中可以有多個分區,具體分區的數量是可配置的。分區的意義很重大,后面的內容會逐漸體現。
- 離線數據裝載:Kafka由於對可拓展的數據持久化的支持,它也非常適合向Hadoop或者數據倉庫中進行數據裝載。
- 插件支持:現在不少活躍的社區已經開發出不少插件來拓展Kafka的功能,如用來配合Storm、Hadoop、flume相關的插件。
2.3 kafka 應用場景
- 日志收集:一個公司可以用Kafka可以收集各種服務的log,通過kafka以統一接口服務的方式開放給各種consumer,例如hadoop、Hbase、Solr等。
- 消息系統:解耦和生產者和消費者、緩存消息等。
- 用戶活動跟蹤:Kafka經常被用來記錄web用戶或者app用戶的各種活動,如瀏覽網頁、搜索、點擊等活動,這些活動信息被各個服務器發布到kafka的topic中,然后訂閱者通過訂閱這些topic來做實時的監控分析,或者裝載到hadoop、數據倉庫中做離線分析和挖掘。
- 運營指標:Kafka也經常用來記錄運營監控數據。包括收集各種分布式應用的數據,生產各種操作的集中反饋,比如報警和報告。
- 流式處理:比如spark streaming和storm
- 事件源
2.4 Kafka架構組件
Kafka中發布訂閱的對象是topic。我們可以為每類數據創建一個topic,把向topic發布消息的客戶端稱作producer,從topic訂閱消息的客戶端稱作consumer。Producers和consumers可以同時從多個topic讀寫數據。一個kafka集群由一個或多個broker服務器組成,它負責持久化和備份具體的kafka消息。
- topic:消息存放的目錄即主題
- Producer:生產消息到topic的一方
- Consumer:訂閱topic消費消息的一方
- Broker:Kafka的服務實例就是一個broker
2.5 Kafka Topic&Partition
消息發送時都被發送到一個topic,其本質就是一個目錄,而topic由是由一些Partition Logs(分區日志)組成,其組織結構如下圖所示:
我們可以看到,每個Partition中的消息都是有序的,生產的消息被不斷追加到Partition log上,其中的每一個消息都被賦予了一個唯一的offset值。
Kafka集群會保存所有的消息,不管消息有沒有被消費;我們可以設定消息的過期時間,只有過期的數據才會被自動清除以釋放磁盤空間。比如我們設置消息過期時間為2天,那么這2天內的所有消息都會被保存到集群中,數據只有超過了兩天才會被清除。
Kafka需要維持的元數據只有一個–消費消息在Partition中的offset值,Consumer每消費一個消息,offset就會加1。其實消息的狀態完全是由Consumer控制的,Consumer可以跟蹤和重設這個offset值,這樣的話Consumer就可以讀取任意位置的消息。
把消息日志以Partition的形式存放有多重考慮,第一,方便在集群中擴展,每個Partition可以通過調整以適應它所在的機器,而一個topic又可以有多個Partition組成,因此整個集群就可以適應任意大小的數據了;第二就是可以提高並發,因為可以以Partition為單位讀寫了。
三、Kafka 核心組件
3.1 Replications、Partitions 和Leaders
通過上面介紹的我們可以知道,kafka中的數據是持久化的並且能夠容錯的。Kafka允許用戶為每個topic設置副本數量,副本數量決定了有幾個broker來存放寫入的數據。如果你的副本數量設置為3,那么一份數據就會被存放在3台不同的機器上,那么就允許有2個機器失敗。一般推薦副本數量至少為2,這樣就可以保證增減、重啟機器時不會影響到數據消費。如果對數據持久化有更高的要求,可以把副本數量設置為3或者更多。
Kafka中的topic是以partition的形式存放的,每一個topic都可以設置它的partition數量,Partition的數量決定了組成topic的log的數量。Producer在生產數據時,會按照一定規則(這個規則是可以自定義的)把消息發布到topic的各個partition中。上面將的副本都是以partition為單位的,不過只有一個partition的副本會被選舉成leader作為讀寫用。
關於如何設置partition值需要考慮的因素。一個partition只能被一個消費者消費(一個消費者可以同時消費多個partition),因此,如果設置的partition的數量小於consumer的數量,就會有消費者消費不到數據。所以,推薦partition的數量一定要大於同時運行的consumer的數量。另外一方面,建議partition的數量大於集群broker的數量,這樣leader partition就可以均勻的分布在各個broker中,最終使得集群負載均衡。在Cloudera,每個topic都有上百個partition。需要注意的是,kafka需要為每個partition分配一些內存來緩存消息數據,如果partition數量越大,就要為kafka分配更大的heap space。
3.2 Producers
Producers直接發送消息到broker上的leader partition,不需要經過任何中介一系列的路由轉發。為了實現這個特性,kafka集群中的每個broker都可以響應producer的請求,並返回topic的一些元信息,這些元信息包括哪些機器是存活的,topic的leader partition都在哪,現階段哪些leader partition是可以直接被訪問的。
Producer客戶端自己控制着消息被推送到哪些partition。實現的方式可以是隨機分配、實現一類隨機負載均衡算法,或者指定一些分區算法。Kafka提供了接口供用戶實現自定義的分區,用戶可以為每個消息指定一個partitionKey,通過這個key來實現一些hash分區算法。比如,把userid作為partitionkey的話,相同userid的消息將會被推送到同一個分區。
以Batch的方式推送數據可以極大的提高處理效率,kafka Producer 可以將消息在內存中累計到一定數量后作為一個batch發送請求。Batch的數量大小可以通過Producer的參數控制,參數值可以設置為累計的消息的數量(如500條)、累計的時間間隔(如100ms)或者累計的數據大小(64KB)。通過增加batch的大小,可以減少網絡請求和磁盤IO的次數,當然具體參數設置需要在效率和時效性方面做一個權衡。
Producers可以異步的並行的向kafka發送消息,但是通常producer在發送完消息之后會得到一個future響應,返回的是offset值或者發送過程中遇到的錯誤。這其中有個非常重要的參數“acks”,這個參數決定了producer要求leader partition 收到確認的副本個數,如果acks設置數量為0,表示producer不會等待broker的響應,所以,producer無法知道消息是否發送成功,這樣有可能會導致數據丟失,但同時,acks值為0會得到最大的系統吞吐量。
若acks設置為1,表示producer會在leader partition收到消息時得到broker的一個確認,這樣會有更好的可靠性,因為客戶端會等待直到broker確認收到消息。若設置為-1,producer會在所有備份的partition收到消息時得到broker的確認,這個設置可以得到最高的可靠性保證。
Kafka 消息有一個定長的header和變長的字節數組組成。因為kafka消息支持字節數組,也就使得kafka可以支持任何用戶自定義的序列號格式或者其它已有的格式如Apache Avro、protobuf等。Kafka沒有限定單個消息的大小,但我們推薦消息大小不要超過1MB,通常一般消息大小都在1~10kB之前。
3.3 Consumers
Kafka提供了兩套consumer api,分為high-level api和sample-api。Sample-api 是一個底層的API,它維持了一個和單一broker的連接,並且這個API是完全無狀態的,每次請求都需要指定offset值,因此,這套API也是最靈活的。
在kafka中,當前讀到消息的offset值是由consumer來維護的,因此,consumer可以自己決定如何讀取kafka中的數據。比如,consumer可以通過重設offset值來重新消費已消費過的數據。不管有沒有被消費,kafka會保存數據一段時間,這個時間周期是可配置的,只有到了過期時間,kafka才會刪除這些數據。
High-level API封裝了對集群中一系列broker的訪問,可以透明的消費一個topic。它自己維持了已消費消息的狀態,即每次消費的都是下一個消息。
High-level API還支持以組的形式消費topic,如果consumers有同一個組名,那么kafka就相當於一個隊列消息服務,而各個consumer均衡的消費相應partition中的數據。若consumers有不同的組名,那么此時kafka就相當與一個廣播服務,會把topic中的所有消息廣播到每個consumer。
四、Kafka核心特性
4.1 壓縮
我們上面已經知道了Kafka支持以集合(batch)為單位發送消息,在此基礎上,Kafka還支持對消息集合進行壓縮,Producer端可以通過GZIP或Snappy格式對消息集合進行壓縮。Producer端進行壓縮之后,在Consumer端需進行解壓。壓縮的好處就是減少傳輸的數據量,減輕對網絡傳輸的壓力,在對大數據處理上,瓶頸往往體現在網絡上而不是CPU(壓縮和解壓會耗掉部分CPU資源)。
那么如何區分消息是壓縮的還是未壓縮的呢,Kafka在消息頭部添加了一個描述壓縮屬性字節,這個字節的后兩位表示消息的壓縮采用的編碼,如果后兩位為0,則表示消息未被壓縮。
4.2消息可靠性
在消息系統中,保證消息在生產和消費過程中的可靠性是十分重要的,在實際消息傳遞過程中,可能會出現如下三中情況:
- 一個消息發送失敗
- 一個消息被發送多次
- 最理想的情況:exactly-once ,一個消息發送成功且僅發送了一次
有許多系統聲稱它們實現了exactly-once,但是它們其實忽略了生產者或消費者在生產和消費過程中有可能失敗的情況。比如雖然一個Producer成功發送一個消息,但是消息在發送途中丟失,或者成功發送到broker,也被consumer成功取走,但是這個consumer在處理取過來的消息時失敗了。
從Producer端看:Kafka是這么處理的,當一個消息被發送后,Producer會等待broker成功接收到消息的反饋(可通過參數控制等待時間),如果消息在途中丟失或是其中一個broker掛掉,Producer會重新發送(我們知道Kafka有備份機制,可以通過參數控制是否等待所有備份節點都收到消息)。
從Consumer端看:前面講到過partition,broker端記錄了partition中的一個offset值,這個值指向Consumer下一個即將消費message。當Consumer收到了消息,但卻在處理過程中掛掉,此時Consumer可以通過這個offset值重新找到上一個消息再進行處理。Consumer還有權限控制這個offset值,對持久化到broker端的消息做任意處理。
4.3 備份機制
備份機制是Kafka0.8版本的新特性,備份機制的出現大大提高了Kafka集群的可靠性、穩定性。有了備份機制后,Kafka允許集群中的節點掛掉后而不影響整個集群工作。一個備份數量為n的集群允許n-1個節點失敗。在所有備份節點中,有一個節點作為lead節點,這個節點保存了其它備份節點列表,並維持各個備份間的狀體同步。下面這幅圖解釋了Kafka的備份機制:
4.4 Kafka高效性相關設計
4.4.1 消息的持久化
Kafka高度依賴文件系統來存儲和緩存消息,一般的人認為磁盤是緩慢的,這導致人們對持久化結構具有競爭性持懷疑態度。其實,磁盤遠比你想象的要快或者慢,這決定於我們如何使用磁盤。
一個和磁盤性能有關的關鍵事實是:磁盤驅動器的吞吐量跟尋到延遲是相背離的,也就是所,線性寫的速度遠遠大於隨機寫。比如:在一個6 7200rpm SATA RAID-5 的磁盤陣列上線性寫的速度大概是600M/秒,但是隨機寫的速度只有100K/秒,兩者相差將近6000倍。線性讀寫在大多數應用場景下是可以預測的,因此,操作系統利用read-ahead和write-behind技術來從大的數據塊中預取數據,或者將多個邏輯上的寫操作組合成一個大寫物理寫操作中。更多的討論可以在ACMQueueArtical中找到,他們發現,對磁盤的線性讀在有些情況下可以比內存的隨機訪問要快一些。
為了補償這個性能上的分歧,現代操作系統都會把空閑的內存用作磁盤緩存,盡管在內存回收的時候會有一點性能上的代價。所有的磁盤讀寫操作會在這個統一的緩存上進行。
此外,如果我們是在JVM的基礎上構建的,熟悉java內存應用管理的人應該清楚以下兩件事情:
- 一個對象的內存消耗是非常高的,經常是所存數據的兩倍或者更多。
- 隨着堆內數據的增多,Java的垃圾回收會變得非常昂貴。
基於這些事實,利用文件系統並且依靠頁緩存比維護一個內存緩存或者其他結構要好——我們至少要使得可用的緩存加倍,通過自動訪問可用內存,並且通過存儲更緊湊的字節結構而不是一個對象,這將有可能再次加倍。這么做的結果就是在一台32GB的機器上,如果不考慮GC懲罰,將最多有28-30GB的緩存。此外,這些緩存將會一直存在即使服務重啟,然而進程內緩存需要在內存中重構(10GB緩存需要花費10分鍾)或者它需要一個完全冷緩存啟動(非常差的初始化性能)。它同時也簡化了代碼,因為現在所有的維護緩存和文件系統之間內聚的邏輯都在操作系統內部了,這使得這樣做比one-off in-process attempts更加高效與准確。如果你的磁盤應用更加傾向於順序讀取,那么read-ahead在每次磁盤讀取中實際上獲取到這人緩存中的有用數據。
以上這些建議了一個簡單的設計:不同於維護盡可能多的內存緩存並且在需要的時候刷新到文件系統中,我們換一種思路。所有的數據不需要調用刷新程序,而是立刻將它寫到一個持久化的日志中。事實上,這僅僅意味着,數據將被傳輸到內核頁緩存中並稍后被刷新。我們可以增加一個配置項以讓系統的用戶來控制數據在什么時候被刷新到物理硬盤上。
4.4.2 常數時間性能保證
消息系統中持久化數據結構的設計通常是維護者一個和消費隊列有關的B樹或者其它能夠隨機存取結構的元數據信息。B樹是一個很好的結構,可以用在事務型與非事務型的語義中。但是它需要一個很高的花費,盡管B樹的操作需要O(logN)。通常情況下,這被認為與常數時間等價,但這對磁盤操作來說是不對的。磁盤尋道一次需要10ms,並且一次只能尋一個,因此並行化是受限的。
直覺上來講,一個持久化的隊列可以構建在對一個文件的讀和追加上,就像一般情況下的日志解決方案。盡管和B樹相比,這種結構不能支持豐富的語義,但是它有一個優點,所有的操作都是常數時間,並且讀寫之間不會相互阻塞。這種設計具有極大的性能優勢:最終系統性能和數據大小完全無關,服務器可以充分利用廉價的硬盤來提供高效的消息服務。
事實上還有一點,磁盤空間的無限增大而不影響性能這點,意味着我們可以提供一般消息系統無法提供的特性。比如說,消息被消費后不是立馬被刪除,我們可以將這些消息保留一段相對比較長的時間(比如一個星期)。
4.4.3 進一步提高效率
我們已經為效率做了非常多的努力。但是有一種非常主要的應用場景是:處理Web活動數據,它的特點是數據量非常大,每一次的網頁瀏覽都會產生大量的寫操作。更進一步,我們假設每一個被發布的消息都會被至少一個consumer消費,因此我們更要怒路讓消費變得更廉價。
通過上面的介紹,我們已經解決了磁盤方面的效率問題,除此之外,在此類系統中還有兩類比較低效的場景:
- 太多小的I/O操作
- 過多的字節拷貝
為了減少大量小I/O操作的問題,kafka的協議是圍繞消息集合構建的。Producer一次網絡請求可以發送一個消息集合,而不是每一次只發一條消息。在server端是以消息塊的形式追加消息到log中的,consumer在查詢的時候也是一次查詢大量的線性數據塊。消息集合即MessageSet,實現本身是一個非常簡單的API,它將一個字節數組或者文件進行打包。所以對消息的處理,這里沒有分開的序列化和反序列化的上步驟,消息的字段可以按需反序列化(如果沒有需要,可以不用反序列化)。
另一個影響效率的問題就是字節拷貝。為了解決字節拷貝的問題,kafka設計了一種“標准字節消息”,Producer、Broker、Consumer共享這一種消息格式。Kakfa的message log在broker端就是一些目錄文件,這些日志文件都是MessageSet按照這種“標准字節消息”格式寫入到磁盤的。
維持這種通用的格式對這些操作的優化尤為重要:持久化log 塊的網絡傳輸。流行的unix操作系統提供了一種非常高效的途徑來實現頁面緩存和socket之間的數據傳遞。在Linux操作系統中,這種方式被稱作:sendfile system call(Java提供了訪問這個系統調用的方法:FileChannel.transferTo api)。
為了理解sendfile的影響,需要理解一般的將數據從文件傳到socket的路徑:
- 操作系統將數據從磁盤讀到內核空間的頁緩存中
- 應用將數據從內核空間讀到用戶空間的緩存中
- 應用將數據寫回內核空間的socket緩存中
- 操作系統將數據從socket緩存寫到網卡緩存中,以便將數據經網絡發出
這種操作方式明顯是非常低效的,這里有四次拷貝,兩次系統調用。如果使用sendfile,就可以避免兩次拷貝:操作系統將數據直接從頁緩存發送到網絡上。所以在這個優化的路徑中,只有最后一步將數據拷貝到網卡緩存中是需要的。
我們期望一個主題上有多個消費者是一種常見的應用場景。利用上述的zero-copy,數據只被拷貝到頁緩存一次,然后就可以在每次消費時被重得利用,而不需要將數據存在內存中,然后在每次讀的時候拷貝到內核空間中。這使得消息消費速度可以達到網絡連接的速度。這樣以來,通過頁面緩存和sendfile的結合使用,整個kafka集群幾乎都已以緩存的方式提供服務,而且即使下游的consumer很多,也不會對整個集群服務造成壓力。
關於sendfile和zero-copy,請參考:zero-copy
五、Kafka集群部署
5.1 集群部署
為了提高性能,推薦采用專用的服務器來部署kafka集群,盡量與hadoop集群分開,因為kafka依賴磁盤讀寫和大的頁面緩存,如果和hadoop共享節點的話會影響其使用頁面緩存的性能。
Kafka集群的大小需要根據硬件的配置、生產者消費者的並發數量、數據的副本個數、數據的保存時長綜合確定。
磁盤的吞吐量尤為重要,因為通常kafka的瓶頸就在磁盤上。
Kafka依賴於zookeeper,建議采用專用服務器來部署zookeeper集群,zookeeper集群的節點采用偶數個,一般建議用3、5、7個。注意zookeeper集群越大其讀寫性能越慢,因為zookeeper需要在節點之間同步數據。一個3節點的zookeeper集群允許一個節點失敗,一個5節點集群允許2個幾點失敗。
5.2 集群大小
有很多因素決定着kafka集群需要具備存儲能力的大小,最准確的衡量辦法就是模擬負載來測算一下,Kafka本身也提供了負載測試的工具。
如果不想通過模擬實驗來評估集群大小,最好的辦法就是根據硬盤的空間需求來推算。下面我就根據網絡和磁盤吞吐量需求來做一下估算。
我們做如下假設:
- W:每秒寫多少MB
- R :副本數
- C :Consumer的數量
一般的來說,kafka集群瓶頸在於網絡和磁盤吞吐量,所以我們先評估一下集群的網絡和磁盤需求。
對於每條消息,每個副本都要寫一遍,所以整體寫的速度是W*R。讀數據的部分主要是集群內部各個副本從leader同步消息讀和集群外部的consumer讀,所以集群內部讀的速率是(R-1)*W,同時,外部consumer讀的速度是C*W,因此:
- Write:W*R
- Read:(R-1)*W+C*W
需要注意的是,我們可以在讀的時候緩存部分數據來減少IO操作,如果一個集群有M MB內存,寫的速度是W MB/sec,則允許M/(W*R) 秒的寫可以被緩存。如果集群有32GB內存,寫的速度是50MB/s的話,則可以至少緩存10分鍾的數據。
5.3 Kafka性能測試
5.4 Kafka在zookeeper中的數據結構
Kafka data structures in Zookeeper
六、Kafka主要配置
6.1 Broker Config
屬性 | 默認值 | 描述 |
---|---|---|
broker.id | 必填參數,broker的唯一標識 | |
log.dirs | /tmp/kafka-logs | Kafka數據存放的目錄。可以指定多個目錄,中間用逗號分隔,當新partition被創建的時會被存放到當前存放partition最少的目錄。 |
port | 9092 | BrokerServer接受客戶端連接的端口號 |
zookeeper.connect | null | Zookeeper的連接串,格式為:hostname1:port1,hostname2:port2,hostname3:port3。可以填一個或多個,為了提高可靠性,建議都填上。注意,此配置允許我們指定一個zookeeper路徑來存放此kafka集群的所有數據,為了與其他應用集群區分開,建議在此配置中指定本集群存放目錄,格式為:hostname1:port1,hostname2:port2,hostname3:port3/chroot/path 。需要注意的是,消費者的參數要和此參數一致。 |
message.max.bytes | 1000000 | 服務器可以接收到的最大的消息大小。注意此參數要和consumer的maximum.message.size大小一致,否則會因為生產者生產的消息太大導致消費者無法消費。 |
num.io.threads | 8 | 服務器用來執行讀寫請求的IO線程數,此參數的數量至少要等於服務器上磁盤的數量。 |
queued.max.requests | 500 | I/O線程可以處理請求的隊列大小,若實際請求數超過此大小,網絡線程將停止接收新的請求。 |
socket.send.buffer.bytes | 100 * 1024 | The SO_SNDBUFF buffer the server prefers for socket connections. |
socket.receive.buffer.bytes | 100 * 1024 | The SO_RCVBUFF buffer the server prefers for socket connections. |
socket.request.max.bytes | 100 * 1024 * 1024 | 服務器允許請求的最大值, 用來防止內存溢出,其值應該小於 Java heap size. |
num.partitions | 1 | 默認partition數量,如果topic在創建時沒有指定partition數量,默認使用此值,建議改為5 |
log.segment.bytes | 1024 * 1024 * 1024 | Segment文件的大小,超過此值將會自動新建一個segment,此值可以被topic級別的參數覆蓋。 |
log.roll.{ms,hours} | 24 * 7 hours | 新建segment文件的時間,此值可以被topic級別的參數覆蓋。 |
log.retention.{ms,minutes,hours} | 7 days | Kafka segment log的保存周期,保存周期超過此時間日志就會被刪除。此參數可以被topic級別參數覆蓋。數據量大時,建議減小此值。 |
log.retention.bytes | -1 | 每個partition的最大容量,若數據量超過此值,partition數據將會被刪除。注意這個參數控制的是每個partition而不是topic。此參數可以被log級別參數覆蓋。 |
log.retention.check.interval.ms | 5 minutes | 刪除策略的檢查周期 |
auto.create.topics.enable | true | 自動創建topic參數,建議此值設置為false,嚴格控制topic管理,防止生產者錯寫topic。 |
default.replication.factor | 1 | 默認副本數量,建議改為2。 |
replica.lag.time.max.ms | 10000 | 在此窗口時間內沒有收到follower的fetch請求,leader會將其從ISR(in-sync replicas)中移除。 |
replica.lag.max.messages | 4000 | 如果replica節點落后leader節點此值大小的消息數量,leader節點就會將其從ISR中移除。 |
replica.socket.timeout.ms | 30 * 1000 | replica向leader發送請求的超時時間。 |
replica.socket.receive.buffer.bytes | 64 * 1024 | The socket receive buffer for network requests to the leader for replicating data. |
replica.fetch.max.bytes | 1024 * 1024 | The number of byes of messages to attempt to fetch for each partition in the fetch requests the replicas send to the leader. |
replica.fetch.wait.max.ms | 500 | The maximum amount of time to wait time for data to arrive on the leader in the fetch requests sent by the replicas to the leader. |
num.replica.fetchers | 1 | Number of threads used to replicate messages from leaders. Increasing this value can increase the degree of I/O parallelism in the follower broker. |
fetch.purgatory.purge.interval.requests | 1000 | The purge interval (in number of requests) of the fetch request purgatory. |
zookeeper.session.timeout.ms | 6000 | ZooKeeper session 超時時間。如果在此時間內server沒有向zookeeper發送心跳,zookeeper就會認為此節點已掛掉。 此值太低導致節點容易被標記死亡;若太高,.會導致太遲發現節點死亡。 |
zookeeper.connection.timeout.ms | 6000 | 客戶端連接zookeeper的超時時間。 |
zookeeper.sync.time.ms | 2000 | H ZK follower落后 ZK leader的時間。 |
controlled.shutdown.enable | true | 允許broker shutdown。如果啟用,broker在關閉自己之前會把它上面的所有leaders轉移到其它brokers上,建議啟用,增加集群穩定性。 |
auto.leader.rebalance.enable | true | If this is enabled the controller will automatically try to balance leadership for partitions among the brokers by periodically returning leadership to the “preferred” replica for each partition if it is available. |
leader.imbalance.per.broker.percentage | 10 | The percentage of leader imbalance allowed per broker. The controller will rebalance leadership if this ratio goes above the configured value per broker. |
leader.imbalance.check.interval.seconds | 300 | The frequency with which to check for leader imbalance. |
offset.metadata.max.bytes | 4096 | The maximum amount of metadata to allow clients to save with their offsets. |
connections.max.idle.ms | 600000 | Idle connections timeout: the server socket processor threads close the connections that idle more than this. |
num.recovery.threads.per.data.dir | 1 | The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. |
unclean.leader.election.enable | true | Indicates whether to enable replicas not in the ISR set to be elected as leader as a last resort, even though doing so may result in data loss. |
delete.topic.enable | false | 啟用deletetopic參數,建議設置為true。 |
offsets.topic.num.partitions | 50 | The number of partitions for the offset commit topic. Since changing this after deployment is currently unsupported, we recommend using a higher setting for production (e.g., 100-200). |
offsets.topic.retention.minutes | 1440 | Offsets that are older than this age will be marked for deletion. The actual purge will occur when the log cleaner compacts the offsets topic. |
offsets.retention.check.interval.ms | 600000 | The frequency at which the offset manager checks for stale offsets. |
offsets.topic.replication.factor | 3 | The replication factor for the offset commit topic. A higher setting (e.g., three or four) is recommended in order to ensure higher availability. If the offsets topic is created when fewer brokers than the replication factor then the offsets topic will be created with fewer replicas. |
offsets.topic.segment.bytes | 104857600 | Segment size for the offsets topic. Since it uses a compacted topic, this should be kept relatively low in order to facilitate faster log compaction and loads. |
offsets.load.buffer.size | 5242880 | An offset load occurs when a broker becomes the offset manager for a set of consumer groups (i.e., when it becomes a leader for an offsets topic partition). This setting corresponds to the batch size (in bytes) to use when reading from the offsets segments when loading offsets into the offset manager’s cache. |
offsets.commit.required.acks | -1 | The number of acknowledgements that are required before the offset commit can be accepted. This is similar to the producer’s acknowledgement setting. In general, the default should not be overridden. |
offsets.commit.timeout.ms | 5000 | The offset commit will be delayed until this timeout or the required number of replicas have received the offset commit. This is similar to the producer request timeout. |
6.2 Producer Config
屬性 | 默認值 | 描述 |
---|---|---|
metadata.broker.list | 啟動時producer查詢brokers的列表,可以是集群中所有brokers的一個子集。注意,這個參數只是用來獲取topic的元信息用,producer會從元信息中挑選合適的broker並與之建立socket連接。格式是:host1:port1,host2:port2。 | |
request.required.acks | 0 | 參見3.2節介紹 |
request.timeout.ms | 10000 | Broker等待ack的超時時間,若等待時間超過此值,會返回客戶端錯誤信息。 |
producer.type | sync | 同步異步模式。async表示異步,sync表示同步。如果設置成異步模式,可以允許生產者以batch的形式push數據,這樣會極大的提高broker性能,推薦設置為異步。 |
serializer.class | kafka.serializer.DefaultEncoder | 序列號類,.默認序列化成 byte[] 。 |
key.serializer.class | Key的序列化類,默認同上。 | |
partitioner.class | kafka.producer.DefaultPartitioner | Partition類,默認對key進行hash。 |
compression.codec | none | 指定producer消息的壓縮格式,可選參數為: “none”, “gzip” and “snappy”。關於壓縮參見4.1節 |
compressed.topics | null | 啟用壓縮的topic名稱。若上面參數選擇了一個壓縮格式,那么壓縮僅對本參數指定的topic有效,若本參數為空,則對所有topic有效。 |
message.send.max.retries | 3 | Producer發送失敗時重試次數。若網絡出現問題,可能會導致不斷重試。 |
retry.backoff.ms | 100 | Before each retry, the producer refreshes the metadata of relevant topics to see if a new leader has been elected. Since leader election takes a bit of time, this property specifies the amount of time that the producer waits before refreshing the metadata. |
topic.metadata.refresh.interval.ms | 600 * 1000 | The producer generally refreshes the topic metadata from brokers when there is a failure (partition missing, leader not available…). It will also poll regularly (default: every 10min so 600000ms). If you set this to a negative value, metadata will only get refreshed on failure. If you set this to zero, the metadata will get refreshed after each message sent (not recommended). Important note: the refresh happen only AFTER the message is sent, so if the producer never sends a message the metadata is never refreshed |
queue.buffering.max.ms | 5000 | 啟用異步模式時,producer緩存消息的時間。比如我們設置成1000時,它會緩存1秒的數據再一次發送出去,這樣可以極大的增加broker吞吐量,但也會造成時效性的降低。 |
queue.buffering.max.messages | 10000 | 采用異步模式時producer buffer 隊列里最大緩存的消息數量,如果超過這個數值,producer就會阻塞或者丟掉消息。 |
queue.enqueue.timeout.ms | -1 | 當達到上面參數值時producer阻塞等待的時間。如果值設置為0,buffer隊列滿時producer不會阻塞,消息直接被丟掉。若值設置為-1,producer會被阻塞,不會丟消息。 |
batch.num.messages | 200 | 采用異步模式時,一個batch緩存的消息數量。達到這個數量值時producer才會發送消息。 |
send.buffer.bytes | 100 * 1024 | Socket write buffer size |
client.id | “” | The client id is a user-specified string sent in each request to help trace calls. It should logically identify the application making the request. |
6.3 Consumer Config
屬性 | 默認值 | 描述 |
---|---|---|
group.id | Consumer的組ID,相同goup.id的consumer屬於同一個組。 | |
zookeeper.connect | Consumer的zookeeper連接串,要和broker的配置一致。 | |
consumer.id | null | 如果不設置會自動生成。 |
socket.timeout.ms | 30 * 1000 | 網絡請求的socket超時時間。實際超時時間由max.fetch.wait + socket.timeout.ms 確定。 |
socket.receive.buffer.bytes | 64 * 1024 | The socket receive buffer for network requests. |
fetch.message.max.bytes | 1024 * 1024 | 查詢topic-partition時允許的最大消息大小。consumer會為每個partition緩存此大小的消息到內存,因此,這個參數可以控制consumer的內存使用量。這個值應該至少比server允許的最大消息大小大,以免producer發送的消息大於consumer允許的消息。 |
num.consumer.fetchers | 1 | The number fetcher threads used to fetch data. |
auto.commit.enable | true | 如果此值設置為true,consumer會周期性的把當前消費的offset值保存到zookeeper。當consumer失敗重啟之后將會使用此值作為新開始消費的值。 |
auto.commit.interval.ms | 60 * 1000 | Consumer提交offset值到zookeeper的周期。 |
queued.max.message.chunks | 2 | 用來被consumer消費的message chunks 數量, 每個chunk可以緩存fetch.message.max.bytes大小的數據量。 |
rebalance.max.retries | 4 | When a new consumer joins a consumer group the set of consumers attempt to “rebalance” the load to assign partitions to each consumer. If the set of consumers changes while this assignment is taking place the rebalance will fail and retry. This setting controls the maximum number of attempts before giving up. |
fetch.min.bytes | 1 | The minimum amount of data the server should return for a fetch request. If insufficient data is available the request will wait for that much data to accumulate before answering the request. |
fetch.wait.max.ms | 100 | The maximum amount of time the server will block before answering the fetch request if there isn’t sufficient data to immediately satisfy fetch.min.bytes. |
rebalance.backoff.ms | 2000 | Backoff time between retries during rebalance. |
refresh.leader.backoff.ms | 200 | Backoff time to wait before trying to determine the leader of a partition that has just lost its leader. |
auto.offset.reset | largest | What to do when there is no initial offset in ZooKeeper or if an offset is out of range ;smallest : automatically reset the offset to the smallest offset; largest : automatically reset the offset to the largest offset;anything else: throw exception to the consumer |
consumer.timeout.ms | -1 | 若在指定時間內沒有消息消費,consumer將會拋出異常。 |
exclude.internal.topics | true | Whether messages from internal topics (such as offsets) should be exposed to the consumer. |
zookeeper.session.timeout.ms | 6000 | ZooKeeper session timeout. If the consumer fails to heartbeat to ZooKeeper for this period of time it is considered dead and a rebalance will occur. |
zookeeper.connection.timeout.ms | 6000 | The max time that the client waits while establishing a connection to zookeeper. |
zookeeper.sync.time.ms | 2000 | How far a ZK follower can be behind a ZK leader |
6.4 Topic 級別的配置
Kafka史上最詳細原理總結



2.Kafka文件存儲機制
- Broker:Kafka節點,一個Kafka節點就是一個broker,多個broker可以組成一個Kafka集群。
- Topic:一類消息,消息存放的目錄即主題,例如page view日志、click日志等都可以以topic的形式存在,Kafka集群能夠同時負責多個topic的分發。
- Partition:topic物理上的分組,一個topic可以分為多個partition,每個partition是一個有序的隊列
- Segment:partition物理上由多個segment組成,每個Segment存着message信息
- Producer : 生產message發送到topic
- Consumer : 訂閱topic消費message, consumer作為一個線程來消費
- Consumer Group:一個Consumer Group包含多個consumer, 這個是預先在配置文件中配置好的。各個consumer(consumer 線程)可以組成一個組(Consumer group ),partition中的每個message只能被組(Consumer group ) 中的一個consumer(consumer 線程 )消費,如果一個message可以被多個consumer(consumer 線程 ) 消費的話,那么這些consumer必須在不同的組。Kafka不支持一個partition中的message由兩個或兩個以上的consumer thread來處理, 除非來自不同的consumer group。它不能像AMQ那樣可以多個BET作為consumer去處理message,這是因為多個BET去消費一個Queue中的數據的時候,由於要保證不能多個線程拿同一條message,所以就需要行級別悲觀所(for update),這就導致了consume的性能下降,吞吐量不夠。而kafka為了保證吞吐量,只允許一個consumer線程去訪問一個partition。如果覺得效率不高的時候,可以加partition的數量來橫向擴展,那么再加新的consumer thread去消費。這樣沒有鎖競爭,充分發揮了橫向的擴展性,吞吐量極高。這也就形成了分布式消費的概念。
- 2.2 kafka一些原理概念

- 2.3 kafka拓撲結構
分析過程分為以下4個步驟:
- topic中partition存儲分布
- partiton中文件存儲方式 (partition在linux服務器上就是一個目錄(文件夾))
- partiton中segment文件存儲結構
- 在partition中如何通過offset查找message
通過上述4過程詳細分析,我們就可以清楚認識到kafka文件存儲機制的奧秘。
假設實驗環境中Kafka集群只有一個broker,xxx/message-folder為數據文件存儲根目錄,在Kafka broker中server.properties文件配置(參數log.dirs=xxx/message-folder),例如創建2個topic名 稱分別為report_push、launch_info, partitions數量都為partitions=4
存儲路徑和目錄規則為:
xxx/message-folder
|--report_push-1
|--report_push-2
|--report_push-3
|--launch_info-0
|--launch_info-1
|--launch_info-2
|--launch_info-3
- 每個partion(目錄)相當於一個巨型文件被平均分配到多個大小相等segment(段)數據文件中。但每個段segment file消息數量不一定相等,這種特性方便old segment file快速被刪除。
- 每個partiton只需要支持順序讀寫就行了,segment文件生命周期由服務端配置參數決定。
這樣做的好處就是能快速刪除無用文件,有效提高磁盤利用率。

- segment file組成:由2大部分組成,分別為index file和data file,此2個文件一一對應,成對出現,后綴".index"和“.log”分別表示為segment索引文件、數據文件.
- segment文件命名規則:partion全局的第一個segment從0開始,后續每個segment文件名為上一個全局partion的最大offset(偏移message數)。數值最大為64位long大小,19位數字字符長度,沒有數字用0填充。
下面文件列表是筆者在Kafka broker上做的一個實驗,創建一個topicXXX包含1 partition,設置每個segment大小為500MB,並啟動producer向Kafka broker寫入大量數據,如下圖2所示segment文件列表形象說明了上述2個規則:

以上述圖2中一對segment file文件為例,說明segment中index<—->data file對應關系物理結構如下:

上述圖3中索引文件存儲大量元數據,數據文件存儲大量消息,索引文件中元數據指向對應數據文件中message的物理偏移地址。其中以索引文件中 元數據3,497為例,依次在數據文件中表示第3個message(在全局partiton表示第368772個message)、以及該消息的物理偏移 地址為497。
從上述圖3了解到segment data file由許多message組成,下面詳細說明message物理結構如下:

參數說明:
關鍵字 | 解釋說明 |
---|---|
8 byte offset | 在parition(分區)內的每條消息都有一個有序的id號,這個id號被稱為偏移(offset),它可以唯一確定每條消息在parition(分區)內的位置。即offset表示partiion的第多少message |
4 byte message size | message大小 |
4 byte CRC32 | 用crc32校驗message |
1 byte “magic" | 表示本次發布Kafka服務程序協議版本號 |
1 byte “attributes" | 表示為獨立版本、或標識壓縮類型、或編碼類型。 |
4 byte key length | 表示key的長度,當key為-1時,K byte key字段不填 |
K byte key | 可選 |
value bytes payload | 表示實際消息數據。 |
例如讀取offset=368776的message,需要通過下面2個步驟查找。
-
第一步查找segment file
上述圖2為例,其中00000000000000000000.index表示最開始的文件,起始偏移量(offset)為0.第二個文件 00000000000000368769.index的消息量起始偏移量為368770 = 368769 + 1.同樣,第三個文件00000000000000737337.index的起始偏移量為737338=737337 + 1,其他后續文件依次類推,以起始偏移量命名並排序這些文件,只要根據offset **二分查找**文件列表,就可以快速定位到具體文件。
當offset=368776時定位到00000000000000368769.index|log
-
第二步通過segment file查找message通過第一步定位到segment file,當offset=368776時,依次定位到00000000000000368769.index的元數據物理位置和 00000000000000368769.log的物理偏移地址,然后再通過00000000000000368769.log順序查找直到 offset=368776為止。
Kafka高效文件存儲設計特點
- Kafka把topic中一個parition大文件分成多個小文件段,通過多個小文件段,就容易定期清除或刪除已經消費完文件,減少磁盤占用。
- 通過索引信息可以快速定位message和確定response的最大大小。
- 通過index元數據全部映射到memory,可以避免segment file的IO磁盤操作。
- 通過索引文件稀疏存儲,可以大幅降低index文件元數據占用空間大小。
1. Kafka集群partition replication默認自動分配分析
下面以一個Kafka集群中4個Broker舉例,創建1個topic包含4個Partition,2 Replication;數據Producer流動如圖所示:
(1)
(2)當集群中新增2節點,Partition增加到6個時分布情況如下:
副本分配邏輯規則如下:
- 在Kafka集群中,每個Broker都有均等分配Partition的Leader機會。
- 上述圖Broker Partition中,箭頭指向為副本,以Partition-0為例:broker1中parition-0為Leader,Broker2中Partition-0為副本。
- 上述圖種每個Broker(按照BrokerId有序)依次分配主Partition,下一個Broker為副本,如此循環迭代分配,多副本都遵循此規則。
- 將所有N Broker和待分配的i個Partition排序.
- 將第i個Partition分配到第(i mod n)個Broker上.
- 將第i個Partition的第j個副本分配到第((i + j) mod n)個Broker上.

-
生產者客戶端應用程序產生消息:
-
客戶端連接對象將消息包裝到請求中發送到服務端
-
服務端的入口也有一個連接對象負責接收請求,並將消息以文件的形式存儲起來
-
服務端返回響應結果給生產者客戶端
-
-
消費者客戶端應用程序消費消息:
-
客戶端連接對象將消費信息也包裝到請求中發送給服務端
-
服務端從文件存儲系統中取出消息
-
服務端返回響應結果給消費者客戶端
-
客戶端將響應結果還原成消息並開始處理消息
-

2. 不創建單獨的cache,使用系統的page cache。發布者順序發布,訂閱者通常比發布者滯后一點點,直接使用linux的page cache效果也比較后,同時減少了cache管理及垃圾收集的開銷。
3. 維護消費關系及每個partition的消費信息。
6.2 Zookeeper上的細節:
1. 每個broker啟動后會在zookeeper上注冊一個臨時的broker registry,包含broker的ip地址和端口號,所存儲的topics和partitions信息。
2. 每個consumer啟動后會在zookeeper上注冊一個臨時的consumer registry:包含consumer所屬的consumer group以及訂閱的topics。
3. 每個consumer group關聯一個臨時的owner registry和一個持久的offset registry。對於被訂閱的每個partition包含一個owner registry,內容為訂閱這個partition的consumer id;同時包含一個offset registry,內容為上一次訂閱的offset。