kafka-工作原理


參考:

https://www.cnblogs.com/sujing/p/10960832.html

https://blog.csdn.net/suifeng3051/article/details/48053965

https://blog.csdn.net/u013573133/article/details/48142677

 

 

 

 

再過半小時,你就能明白kafka的工作原理了

本文在個人技術博客不同步發布,詳情可猛戳
亦可掃描屏幕右側二維碼關注個人公眾號,公眾號內有個人聯系方式,等你來撩...

為什么需要消息隊列

  周末無聊刷着手機,某寶網APP突然蹦出來一條消息“為了回饋老客戶,女朋友買一送一,活動僅限今天!”。買一送一還有這種好事,那我可不能錯過!忍不住立馬點了去。於是選了兩個最新款,下單、支付一氣呵成!滿足的躺在床上,想着馬上有女朋友了,竟然幸福的失眠了……
  第二天正常上着班,突然接到快遞小哥的電話:
  小哥:“你是xx嗎?你的女朋友到了,我現在在你樓下,你來拿一下吧!”。
  我:“這……我在上班呢,可以晚上送過來嗎?“。
  小哥:“晚上可不行哦,晚上我也下班了呢!”。
  於是兩個人僵持了很久……
  最后小哥說,要不我幫你放到樓下小芳便利店吧,你晚上下班了過來拿,尷尬的局面這才得以緩解!

  回到正題,如果沒有小芳便利店,那快遞小哥和我的交互圖就應該如下:
  
   

  會出現什么情況呢?
  1、為了這個女朋友,我請假回去拿(老板不批)。
  2、小哥一直在你樓下等(小哥還有其他的快遞要送)。
  3、周末再送(顯然等不及)。
  4、這個女朋友我不要了(絕對不可能)!

  小芳便利店出現后,交互圖就應如下:
  
  

  在上面例子中,“快遞小哥”和“買女朋友的我”就是需要交互的兩個系統,小芳便利店就是我們本文要講的-“消息中間件”。總結下來小芳便利店(消息中間件)出現后有如下好處:
  1、 解耦
  快遞小哥手上有很多快遞需要送,他每次都需要先電話一一確認收貨人是否有空、哪個時間段有空,然后再確定好送貨的方案。這樣完全依賴收貨人了!如果快遞一多,快遞小哥估計的忙瘋了……如果有了便利店,快遞小哥只需要將同一個小區的快遞放在同一個便利店,然后通知收貨人來取貨就可以了,這時候快遞小哥和收貨人就實現了解耦!

  2、 異步
  快遞小哥打電話給我后需要一直在你樓下等着,直到我拿走你的快遞他才能去送其他人的。快遞小哥將快遞放在小芳便利店后,又可以干其他的活兒去了,不需要等待你到來而一直處於等待狀態。提高了工作的效率。

  3、 削峰
  假設雙十一我買了不同店里的各種商品,而恰巧這些店發貨的快遞都不一樣,有中通、圓通、申通、各種通等……更巧的是他們都同時到貨了!中通的小哥打來電話叫我去北門取快遞、圓通小哥叫我去南門、申通小哥叫我去東門。我一時手忙腳亂……

  我們能看到在系統需要交互的場景中,使用消息隊列中間件真的是好處多多,基於這種思路,就有了豐巢、菜鳥驛站等比小芳便利店更專業的“中間件”了。
  最后,上面的故事純屬虛構……

消息隊列通信的模式

  通過上面的例子我們引出了消息中間件,並且介紹了消息隊列出現后的好處,這里就需要介紹消息隊列通信的兩種模式了:

一、 點對點模式

  

  如上圖所示,點對點模式通常是基於拉取或者輪詢的消息傳送模型,這個模型的特點是發送到隊列的消息被一個且只有一個消費者進行處理。生產者將消息放入消息隊列后,由消費者主動的去拉取消息進行消費。點對點模型的的優點是消費者拉取消息的頻率可以由自己控制。但是消息隊列是否有消息需要消費,在消費者端無法感知,所以在消費者端需要額外的線程去監控。

二、 發布訂閱模式

  

  如上圖所示,發布訂閱模式是一個基於消息送的消息傳送模型,改模型可以有多種不同的訂閱者。生產者將消息放入消息隊列后,隊列會將消息推送給訂閱過該類消息的消費者(類似微信公眾號)。由於是消費者被動接收推送,所以無需感知消息隊列是否有待消費的消息!但是consumer1、consumer2、consumer3由於機器性能不一樣,所以處理消息的能力也會不一樣,但消息隊列卻無法感知消費者消費的速度!所以推送的速度成了發布訂閱模模式的一個問題!假設三個消費者處理速度分別是8M/s、5M/s、2M/s,如果隊列推送的速度為5M/s,則consumer3無法承受!如果隊列推送的速度為2M/s,則consumer1、consumer2會出現資源的極大浪費!

Kafka

  上面簡單的介紹了為什么需要消息隊列以及消息隊列通信的兩種模式,接下來就到了我們本文的主角——kafka閃亮登場的時候了!Kafka是一種高吞吐量的分布式發布訂閱消息系統,它可以處理消費者規模的網站中的所有動作流數據,具有高性能、持久化、多副本備份、橫向擴展能力……… 一些基本的介紹這里就不展開了,網上有太多關於這些的介紹了,讀者可以自行百度一下!

基礎架構及術語

  話不多說,先看圖,通過這張圖我們來捋一捋相關的概念及之間的關系:

  

  如果看到這張圖你很懵逼,木有關系!我們先來分析相關概念
  Producer:Producer即生產者,消息的產生者,是消息的入口。
  kafka cluster
    Broker:Broker是kafka實例,每個服務器上有一個或多個kafka的實例,我們姑且認為每個broker對應一台服務器。每個kafka集群內的broker都有一個不重復的編號,如圖中的broker-0、broker-1等……
    Topic:消息的主題,可以理解為消息的分類,kafka的數據就保存在topic。在每個broker上都可以創建多個topic。
    Partition:Topic的分區,每個topic可以有多個分區,分區的作用是做負載,提高kafka的吞吐量。同一個topic在不同的分區的數據是不重復的,partition的表現形式就是一個一個的文件夾!
    Replication:每一個分區都有多個副本,副本的作用是做備胎。當主分區(Leader)故障的時候會選擇一個備胎(Follower)上位,成為Leader。在kafka中默認副本的最大數量是10個,且副本的數量不能大於Broker的數量,follower和leader絕對是在不同的機器,同一機器對同一個分區也只可能存放一個副本(包括自己)。
    Message:每一條發送的消息主體。
  Consumer:消費者,即消息的消費方,是消息的出口。
  Consumer Group:我們可以將多個消費組組成一個消費者組,在kafka的設計中同一個分區的數據只能被消費者組中的某一個消費者消費。同一個消費者組的消費者可以消費同一個topic的不同分區的數據,這也是為了提高kafka的吞吐量!
  Zookeeper:kafka集群依賴zookeeper來保存集群的的元信息,來保證系統的可用性。

工作流程分析

  上面介紹了kafka的基礎架構及基本概念,不知道大家看完有沒有對kafka有個大致印象,如果對還比較懵也沒關系!我們接下來再結合上面的結構圖分析kafka的工作流程,最后再回來整個梳理一遍我相信你會更有收獲!

發送數據

  我們看上面的架構圖中,producer就是生產者,是數據的入口。注意看圖中的紅色箭頭,Producer在寫入數據的時候永遠的找leader,不會直接將數據寫入follower!那leader怎么找呢?寫入的流程又是什么樣的呢?我們看下圖:

  

  發送的流程就在圖中已經說明了,就不單獨在文字列出來了!需要注意的一點是,消息寫入leader后,follower是主動的去leader進行同步的!producer采用push模式將數據發布到broker,每條消息追加到分區中,順序寫入磁盤,所以保證同一分區內的數據是有序的!寫入示意圖如下:

  

  上面說到數據會寫入到不同的分區,那kafka為什么要做分區呢?相信大家應該也能猜到,分區的主要目的是:
  1、 方便擴展。因為一個topic可以有多個partition,所以我們可以通過擴展機器去輕松的應對日益增長的數據量。
  2、 提高並發。以partition為讀寫單位,可以多個消費者同時消費數據,提高了消息的處理效率。

  熟悉負載均衡的朋友應該知道,當我們向某個服務器發送請求的時候,服務端可能會對請求做一個負載,將流量分發到不同的服務器,那在kafka中,如果某個topic有多個partition,producer又怎么知道該將數據發往哪個partition呢?kafka中有幾個原則:
  1、 partition在寫入的時候可以指定需要寫入的partition,如果有指定,則寫入對應的partition。
  2、 如果沒有指定partition,但是設置了數據的key,則會根據key的值hash出一個partition。
  3、 如果既沒指定partition,又沒有設置key,則會輪詢選出一個partition。

  保證消息不丟失是一個消息隊列中間件的基本保證,那producer在向kafka寫入消息的時候,怎么保證消息不丟失呢?其實上面的寫入流程圖中有描述出來,那就是通過ACK應答機制!在生產者向隊列寫入數據的時候可以設置參數來確定是否確認kafka接收到數據,這個參數可設置的值為01all
  0代表producer往集群發送數據不需要等到集群的返回,不確保消息發送成功。安全性最低但是效率最高。
  1代表producer往集群發送數據只要leader應答就可以發送下一條,只確保leader發送成功。
  all代表producer往集群發送數據需要所有的follower都完成從leader的同步才會發送下一條,確保leader發送成功和所有的副本都完成備份。安全性最高,但是效率最低。

  最后要注意的是,如果往不存在的topic寫數據,能不能寫入成功呢?kafka會自動創建topic,分區和副本的數量根據默認配置都是1。

保存數據

  Producer將數據寫入kafka后,集群就需要對數據進行保存了!kafka將數據保存在磁盤,可能在我們的一般的認知里,寫入磁盤是比較耗時的操作,不適合這種高並發的組件。Kafka初始會單獨開辟一塊磁盤空間,順序寫入數據(效率比隨機寫入高)。

Partition 結構
  前面說過了每個topic都可以分為一個或多個partition,如果你覺得topic比較抽象,那partition就是比較具體的東西了!Partition在服務器上的表現形式就是一個一個的文件夾,每個partition的文件夾下面會有多組segment文件,每組segment文件又包含.index文件、.log文件、.timeindex文件(早期版本中沒有)三個文件, log文件就實際是存儲message的地方,而index和timeindex文件為索引文件,用於檢索消息。

  

  如上圖,這個partition有三組segment文件,每個log文件的大小是一樣的,但是存儲的message數量是不一定相等的(每條的message大小不一致)。文件的命名是以該segment最小offset來命名的,如000.index存儲offset為0~368795的消息,kafka就是利用分段+索引的方式來解決查找效率的問題。

Message結構
上面說到log文件就實際是存儲message的地方,我們在producer往kafka寫入的也是一條一條的message,那存儲在log中的message是什么樣子的呢?消息主要包含消息體、消息大小、offset、壓縮類型……等等!我們重點需要知道的是下面三個:
  1、 offset:offset是一個占8byte的有序id號,它可以唯一確定每條消息在parition內的位置!
  2、 消息大小:消息大小占用4byte,用於描述消息的大小。
  3、 消息體:消息體存放的是實際的消息數據(被壓縮過),占用的空間根據具體的消息而不一樣。

存儲策略
  無論消息是否被消費,kafka都會保存所有的消息。那對於舊數據有什么刪除策略呢?
  1、 基於時間,默認配置是168小時(7天)。
  2、 基於大小,默認配置是1073741824。
  需要注意的是,kafka讀取特定消息的時間復雜度是O(1),所以這里刪除過期的文件並不會提高kafka的性能!

消費數據

  消息存儲在log文件后,消費者就可以進行消費了。與生產消息相同的是,消費者在拉取消息的時候也是找leader去拉取。

  多個消費者可以組成一個消費者組(consumer group),每個消費者組都有一個組id!同一個消費組者的消費者可以消費同一topic下不同分區的數據,但是不會組內多個消費者消費同一分區的數據!!!是不是有點繞。我們看下圖:

  

  圖示是消費者組內的消費者小於partition數量的情況,所以會出現某個消費者消費多個partition數據的情況,消費的速度也就不及只處理一個partition的消費者的處理速度!如果是消費者組的消費者多於partition的數量,那會不會出現多個消費者消費同一個partition的數據呢?上面已經提到過不會出現這種情況!多出來的消費者不消費任何partition的數據。所以在實際的應用中,建議消費者組的consumer的數量與partition的數量一致
  在保存數據的小節里面,我們聊到了partition划分為多組segment,每個segment又包含.log、.index、.timeindex文件,存放的每條message包含offset、消息大小、消息體……我們多次提到segment和offset,查找消息的時候是怎么利用segment+offset配合查找的呢?假如現在需要查找一個offset為368801的message是什么樣的過程呢?我們先看看下面的圖:

  1、 先找到offset的368801message所在的segment文件(利用二分法查找),這里找到的就是在第二個segment文件。
  2、 打開找到的segment中的.index文件(也就是368796.index文件,該文件起始偏移量為368796+1,我們要查找的offset為368801的message在該index內的偏移量為368796+5=368801,所以這里要查找的相對offset為5)。由於該文件采用的是稀疏索引的方式存儲着相對offset及對應message物理偏移量的關系,所以直接找相對offset為5的索引找不到,這里同樣利用二分法查找相對offset小於或者等於指定的相對offset的索引條目中最大的那個相對offset,所以找到的是相對offset為4的這個索引。
  3、 根據找到的相對offset為4的索引確定message存儲的物理偏移位置為256。打開數據文件,從位置為256的那個地方開始順序掃描直到找到offset為368801的那條Message。

  這套機制是建立在offset為有序的基礎上,利用segment+有序offset+稀疏索引+二分查找+順序查找等多種手段來高效的查找數據!至此,消費者就能拿到需要處理的數據進行處理了。那每個消費者又是怎么記錄自己消費的位置呢?在早期的版本中,消費者將消費到的offset維護zookeeper中,consumer每間隔一段時間上報一次,這里容易導致重復消費,且性能不好!在新的版本中消費者消費到的offset已經直接維護在kafk集群的__consumer_offsets這個topic中!

  最后,推薦極客時間上的一門不錯的架構師課程,走向架構師必備!
    

 

 

 

 

 

 

 

 

Kafka 設計與原理詳解

一、Kafka簡介

本文綜合了我之前寫的kafka相關文章,可作為一個全面了解學習kafka的培訓學習資料。
  • 1

轉載請注明出處 : 本文鏈接

1.1 背景歷史

當今社會各種應用系統諸如商業、社交、搜索、瀏覽等像信息工廠一樣不斷的生產出各種信息,在大數據時代,我們面臨如下幾個挑戰:

  1. 如何收集這些巨大的信息
  2. 如何分析它
  3. 如何及時做到如上兩點

以上幾個挑戰形成了一個業務需求模型,即生產者生產(produce)各種信息,消費者消費(consume)(處理分析)這些信息,而在生產者與消費者之間,需要一個溝通兩者的橋梁-消息系統。從一個微觀層面來說,這種需求也可理解為不同的系統之間如何傳遞消息。

1.2 Kafka誕生

Kafka由 linked-in 開源
kafka-即是解決上述這類問題的一個框架,它實現了生產者和消費者之間的無縫連接。
kafka-高產出的分布式消息系統(A high-throughput distributed messaging system)

1.3 Kafka現在

Apache kafka 是一個分布式的基於push-subscribe的消息系統,它具備快速、可擴展、可持久化的特點。它現在是Apache旗下的一個開源系統,作為hadoop生態系統的一部分,被各種商業公司廣泛應用。它的最大的特性就是可以實時的處理大量數據以滿足各種需求場景:比如基於hadoop的批處理系統、低延遲的實時系統、storm/spark流式處理引擎。

二、Kafka技術概覽

2.1 Kafka的特性

  • 高吞吐量、低延遲:kafka每秒可以處理幾十萬條消息,它的延遲最低只有幾毫秒
  • 可擴展性:kafka集群支持熱擴展
  • 持久性、可靠性:消息被持久化到本地磁盤,並且支持數據備份防止數據丟失
  • 容錯性:允許集群中節點失敗(若副本數量為n,則允許n-1個節點失敗)
  • 高並發:支持數千個客戶端同時讀寫

2.2 Kafka一些重要設計思想

下面介紹先大體介紹一下Kafka的主要設計思想,可以讓相關人員在短時間內了解到kafka相關特性,如果想深入研究,后面會對其中每一個特性都做詳細介紹。

  • Consumergroup:各個consumer可以組成一個組,每個消息只能被組中的一個consumer消費,如果一個消息可以被多個consumer消費的話,那么這些consumer必須在不同的組。
  • 消息狀態:在Kafka中,消息的狀態被保存在consumer中,broker不會關心哪個消息被消費了被誰消費了,只記錄一個offset值(指向partition中下一個要被消費的消息位置),這就意味着如果consumer處理不好的話,broker上的一個消息可能會被消費多次。
  • 消息持久化:Kafka中會把消息持久化到本地文件系統中,並且保持極高的效率。
  • 消息有效期:Kafka會長久保留其中的消息,以便consumer可以多次消費,當然其中很多細節是可配置的。
  • 批量發送:Kafka支持以消息集合為單位進行批量發送,以提高push效率。
  • push-and-pull : Kafka中的Producer和consumer采用的是push-and-pull模式,即Producer只管向broker push消息,consumer只管從broker pull消息,兩者對消息的生產和消費是異步的。
  • Kafka集群中broker之間的關系:不是主從關系,各個broker在集群中地位一樣,我們可以隨意的增加或刪除任何一個broker節點。
  • 負載均衡方面: Kafka提供了一個 metadata API來管理broker之間的負載(對Kafka0.8.x而言,對於0.7.x主要靠zookeeper來實現負載均衡)。
  • 同步異步:Producer采用異步push方式,極大提高Kafka系統的吞吐率(可以通過參數控制是采用同步還是異步方式)。
  • 分區機制partition:Kafka的broker端支持消息分區,Producer可以決定把消息發到哪個分區,在一個分區中消息的順序就是Producer發送消息的順序,一個主題中可以有多個分區,具體分區的數量是可配置的。分區的意義很重大,后面的內容會逐漸體現。
  • 離線數據裝載:Kafka由於對可拓展的數據持久化的支持,它也非常適合向Hadoop或者數據倉庫中進行數據裝載。
  • 插件支持:現在不少活躍的社區已經開發出不少插件來拓展Kafka的功能,如用來配合Storm、Hadoop、flume相關的插件。

2.3 kafka 應用場景

  • 日志收集:一個公司可以用Kafka可以收集各種服務的log,通過kafka以統一接口服務的方式開放給各種consumer,例如hadoop、Hbase、Solr等。
  • 消息系統:解耦和生產者和消費者、緩存消息等。
  • 用戶活動跟蹤:Kafka經常被用來記錄web用戶或者app用戶的各種活動,如瀏覽網頁、搜索、點擊等活動,這些活動信息被各個服務器發布到kafka的topic中,然后訂閱者通過訂閱這些topic來做實時的監控分析,或者裝載到hadoop、數據倉庫中做離線分析和挖掘。
  • 運營指標:Kafka也經常用來記錄運營監控數據。包括收集各種分布式應用的數據,生產各種操作的集中反饋,比如報警和報告。
  • 流式處理:比如spark streaming和storm
  • 事件源

2.4 Kafka架構組件

Kafka中發布訂閱的對象是topic。我們可以為每類數據創建一個topic,把向topic發布消息的客戶端稱作producer,從topic訂閱消息的客戶端稱作consumer。Producers和consumers可以同時從多個topic讀寫數據。一個kafka集群由一個或多個broker服務器組成,它負責持久化和備份具體的kafka消息。

  • topic:消息存放的目錄即主題
  • Producer:生產消息到topic的一方
  • Consumer:訂閱topic消費消息的一方
  • Broker:Kafka的服務實例就是一個broker

2.5 Kafka Topic&Partition

消息發送時都被發送到一個topic,其本質就是一個目錄,而topic由是由一些Partition Logs(分區日志)組成,其組織結構如下圖所示:

我們可以看到,每個Partition中的消息都是有序的,生產的消息被不斷追加到Partition log上,其中的每一個消息都被賦予了一個唯一的offset值。
Kafka集群會保存所有的消息,不管消息有沒有被消費;我們可以設定消息的過期時間,只有過期的數據才會被自動清除以釋放磁盤空間。比如我們設置消息過期時間為2天,那么這2天內的所有消息都會被保存到集群中,數據只有超過了兩天才會被清除。
Kafka需要維持的元數據只有一個–消費消息在Partition中的offset值,Consumer每消費一個消息,offset就會加1。其實消息的狀態完全是由Consumer控制的,Consumer可以跟蹤和重設這個offset值,這樣的話Consumer就可以讀取任意位置的消息。
把消息日志以Partition的形式存放有多重考慮,第一,方便在集群中擴展,每個Partition可以通過調整以適應它所在的機器,而一個topic又可以有多個Partition組成,因此整個集群就可以適應任意大小的數據了;第二就是可以提高並發,因為可以以Partition為單位讀寫了。

三、Kafka 核心組件

3.1 Replications、Partitions 和Leaders

通過上面介紹的我們可以知道,kafka中的數據是持久化的並且能夠容錯的。Kafka允許用戶為每個topic設置副本數量,副本數量決定了有幾個broker來存放寫入的數據。如果你的副本數量設置為3,那么一份數據就會被存放在3台不同的機器上,那么就允許有2個機器失敗。一般推薦副本數量至少為2,這樣就可以保證增減、重啟機器時不會影響到數據消費。如果對數據持久化有更高的要求,可以把副本數量設置為3或者更多。
Kafka中的topic是以partition的形式存放的,每一個topic都可以設置它的partition數量,Partition的數量決定了組成topic的log的數量。Producer在生產數據時,會按照一定規則(這個規則是可以自定義的)把消息發布到topic的各個partition中。上面將的副本都是以partition為單位的,不過只有一個partition的副本會被選舉成leader作為讀寫用。
關於如何設置partition值需要考慮的因素。一個partition只能被一個消費者消費(一個消費者可以同時消費多個partition),因此,如果設置的partition的數量小於consumer的數量,就會有消費者消費不到數據。所以,推薦partition的數量一定要大於同時運行的consumer的數量。另外一方面,建議partition的數量大於集群broker的數量,這樣leader partition就可以均勻的分布在各個broker中,最終使得集群負載均衡。在Cloudera,每個topic都有上百個partition。需要注意的是,kafka需要為每個partition分配一些內存來緩存消息數據,如果partition數量越大,就要為kafka分配更大的heap space。

3.2 Producers

Producers直接發送消息到broker上的leader partition,不需要經過任何中介一系列的路由轉發。為了實現這個特性,kafka集群中的每個broker都可以響應producer的請求,並返回topic的一些元信息,這些元信息包括哪些機器是存活的,topic的leader partition都在哪,現階段哪些leader partition是可以直接被訪問的。
Producer客戶端自己控制着消息被推送到哪些partition。實現的方式可以是隨機分配、實現一類隨機負載均衡算法,或者指定一些分區算法。Kafka提供了接口供用戶實現自定義的分區,用戶可以為每個消息指定一個partitionKey,通過這個key來實現一些hash分區算法。比如,把userid作為partitionkey的話,相同userid的消息將會被推送到同一個分區。
以Batch的方式推送數據可以極大的提高處理效率,kafka Producer 可以將消息在內存中累計到一定數量后作為一個batch發送請求。Batch的數量大小可以通過Producer的參數控制,參數值可以設置為累計的消息的數量(如500條)、累計的時間間隔(如100ms)或者累計的數據大小(64KB)。通過增加batch的大小,可以減少網絡請求和磁盤IO的次數,當然具體參數設置需要在效率和時效性方面做一個權衡。
Producers可以異步的並行的向kafka發送消息,但是通常producer在發送完消息之后會得到一個future響應,返回的是offset值或者發送過程中遇到的錯誤。這其中有個非常重要的參數“acks”,這個參數決定了producer要求leader partition 收到確認的副本個數,如果acks設置數量為0,表示producer不會等待broker的響應,所以,producer無法知道消息是否發送成功,這樣有可能會導致數據丟失,但同時,acks值為0會得到最大的系統吞吐量。
若acks設置為1,表示producer會在leader partition收到消息時得到broker的一個確認,這樣會有更好的可靠性,因為客戶端會等待直到broker確認收到消息。若設置為-1,producer會在所有備份的partition收到消息時得到broker的確認,這個設置可以得到最高的可靠性保證。
Kafka 消息有一個定長的header和變長的字節數組組成。因為kafka消息支持字節數組,也就使得kafka可以支持任何用戶自定義的序列號格式或者其它已有的格式如Apache Avro、protobuf等。Kafka沒有限定單個消息的大小,但我們推薦消息大小不要超過1MB,通常一般消息大小都在1~10kB之前。

3.3 Consumers

Kafka提供了兩套consumer api,分為high-level api和sample-api。Sample-api 是一個底層的API,它維持了一個和單一broker的連接,並且這個API是完全無狀態的,每次請求都需要指定offset值,因此,這套API也是最靈活的。
在kafka中,當前讀到消息的offset值是由consumer來維護的,因此,consumer可以自己決定如何讀取kafka中的數據。比如,consumer可以通過重設offset值來重新消費已消費過的數據。不管有沒有被消費,kafka會保存數據一段時間,這個時間周期是可配置的,只有到了過期時間,kafka才會刪除這些數據。
High-level API封裝了對集群中一系列broker的訪問,可以透明的消費一個topic。它自己維持了已消費消息的狀態,即每次消費的都是下一個消息。
High-level API還支持以組的形式消費topic,如果consumers有同一個組名,那么kafka就相當於一個隊列消息服務,而各個consumer均衡的消費相應partition中的數據。若consumers有不同的組名,那么此時kafka就相當與一個廣播服務,會把topic中的所有消息廣播到每個consumer。

四、Kafka核心特性

4.1 壓縮

我們上面已經知道了Kafka支持以集合(batch)為單位發送消息,在此基礎上,Kafka還支持對消息集合進行壓縮,Producer端可以通過GZIP或Snappy格式對消息集合進行壓縮。Producer端進行壓縮之后,在Consumer端需進行解壓。壓縮的好處就是減少傳輸的數據量,減輕對網絡傳輸的壓力,在對大數據處理上,瓶頸往往體現在網絡上而不是CPU(壓縮和解壓會耗掉部分CPU資源)。
那么如何區分消息是壓縮的還是未壓縮的呢,Kafka在消息頭部添加了一個描述壓縮屬性字節,這個字節的后兩位表示消息的壓縮采用的編碼,如果后兩位為0,則表示消息未被壓縮。

4.2消息可靠性

在消息系統中,保證消息在生產和消費過程中的可靠性是十分重要的,在實際消息傳遞過程中,可能會出現如下三中情況:

  • 一個消息發送失敗
  • 一個消息被發送多次
  • 最理想的情況:exactly-once ,一個消息發送成功且僅發送了一次

有許多系統聲稱它們實現了exactly-once,但是它們其實忽略了生產者或消費者在生產和消費過程中有可能失敗的情況。比如雖然一個Producer成功發送一個消息,但是消息在發送途中丟失,或者成功發送到broker,也被consumer成功取走,但是這個consumer在處理取過來的消息時失敗了。
從Producer端看:Kafka是這么處理的,當一個消息被發送后,Producer會等待broker成功接收到消息的反饋(可通過參數控制等待時間),如果消息在途中丟失或是其中一個broker掛掉,Producer會重新發送(我們知道Kafka有備份機制,可以通過參數控制是否等待所有備份節點都收到消息)。
從Consumer端看:前面講到過partition,broker端記錄了partition中的一個offset值,這個值指向Consumer下一個即將消費message。當Consumer收到了消息,但卻在處理過程中掛掉,此時Consumer可以通過這個offset值重新找到上一個消息再進行處理。Consumer還有權限控制這個offset值,對持久化到broker端的消息做任意處理。

4.3 備份機制

備份機制是Kafka0.8版本的新特性,備份機制的出現大大提高了Kafka集群的可靠性、穩定性。有了備份機制后,Kafka允許集群中的節點掛掉后而不影響整個集群工作。一個備份數量為n的集群允許n-1個節點失敗。在所有備份節點中,有一個節點作為lead節點,這個節點保存了其它備份節點列表,並維持各個備份間的狀體同步。下面這幅圖解釋了Kafka的備份機制:

這里寫圖片描述

4.4 Kafka高效性相關設計

4.4.1 消息的持久化

Kafka高度依賴文件系統來存儲和緩存消息,一般的人認為磁盤是緩慢的,這導致人們對持久化結構具有競爭性持懷疑態度。其實,磁盤遠比你想象的要快或者慢,這決定於我們如何使用磁盤。
一個和磁盤性能有關的關鍵事實是:磁盤驅動器的吞吐量跟尋到延遲是相背離的,也就是所,線性寫的速度遠遠大於隨機寫。比如:在一個6 7200rpm SATA RAID-5 的磁盤陣列上線性寫的速度大概是600M/秒,但是隨機寫的速度只有100K/秒,兩者相差將近6000倍。線性讀寫在大多數應用場景下是可以預測的,因此,操作系統利用read-ahead和write-behind技術來從大的數據塊中預取數據,或者將多個邏輯上的寫操作組合成一個大寫物理寫操作中。更多的討論可以在ACMQueueArtical中找到,他們發現,對磁盤的線性讀在有些情況下可以比內存的隨機訪問要快一些。
為了補償這個性能上的分歧,現代操作系統都會把空閑的內存用作磁盤緩存,盡管在內存回收的時候會有一點性能上的代價。所有的磁盤讀寫操作會在這個統一的緩存上進行。
此外,如果我們是在JVM的基礎上構建的,熟悉java內存應用管理的人應該清楚以下兩件事情:

  1. 一個對象的內存消耗是非常高的,經常是所存數據的兩倍或者更多。
  2. 隨着堆內數據的增多,Java的垃圾回收會變得非常昂貴。

基於這些事實,利用文件系統並且依靠頁緩存比維護一個內存緩存或者其他結構要好——我們至少要使得可用的緩存加倍,通過自動訪問可用內存,並且通過存儲更緊湊的字節結構而不是一個對象,這將有可能再次加倍。這么做的結果就是在一台32GB的機器上,如果不考慮GC懲罰,將最多有28-30GB的緩存。此外,這些緩存將會一直存在即使服務重啟,然而進程內緩存需要在內存中重構(10GB緩存需要花費10分鍾)或者它需要一個完全冷緩存啟動(非常差的初始化性能)。它同時也簡化了代碼,因為現在所有的維護緩存和文件系統之間內聚的邏輯都在操作系統內部了,這使得這樣做比one-off in-process attempts更加高效與准確。如果你的磁盤應用更加傾向於順序讀取,那么read-ahead在每次磁盤讀取中實際上獲取到這人緩存中的有用數據。
以上這些建議了一個簡單的設計:不同於維護盡可能多的內存緩存並且在需要的時候刷新到文件系統中,我們換一種思路。所有的數據不需要調用刷新程序,而是立刻將它寫到一個持久化的日志中。事實上,這僅僅意味着,數據將被傳輸到內核頁緩存中並稍后被刷新。我們可以增加一個配置項以讓系統的用戶來控制數據在什么時候被刷新到物理硬盤上。

4.4.2 常數時間性能保證

消息系統中持久化數據結構的設計通常是維護者一個和消費隊列有關的B樹或者其它能夠隨機存取結構的元數據信息。B樹是一個很好的結構,可以用在事務型與非事務型的語義中。但是它需要一個很高的花費,盡管B樹的操作需要O(logN)。通常情況下,這被認為與常數時間等價,但這對磁盤操作來說是不對的。磁盤尋道一次需要10ms,並且一次只能尋一個,因此並行化是受限的。
直覺上來講,一個持久化的隊列可以構建在對一個文件的讀和追加上,就像一般情況下的日志解決方案。盡管和B樹相比,這種結構不能支持豐富的語義,但是它有一個優點,所有的操作都是常數時間,並且讀寫之間不會相互阻塞。這種設計具有極大的性能優勢:最終系統性能和數據大小完全無關,服務器可以充分利用廉價的硬盤來提供高效的消息服務。
事實上還有一點,磁盤空間的無限增大而不影響性能這點,意味着我們可以提供一般消息系統無法提供的特性。比如說,消息被消費后不是立馬被刪除,我們可以將這些消息保留一段相對比較長的時間(比如一個星期)。

4.4.3 進一步提高效率

我們已經為效率做了非常多的努力。但是有一種非常主要的應用場景是:處理Web活動數據,它的特點是數據量非常大,每一次的網頁瀏覽都會產生大量的寫操作。更進一步,我們假設每一個被發布的消息都會被至少一個consumer消費,因此我們更要怒路讓消費變得更廉價。
通過上面的介紹,我們已經解決了磁盤方面的效率問題,除此之外,在此類系統中還有兩類比較低效的場景:

  • 太多小的I/O操作
  • 過多的字節拷貝

為了減少大量小I/O操作的問題,kafka的協議是圍繞消息集合構建的。Producer一次網絡請求可以發送一個消息集合,而不是每一次只發一條消息。在server端是以消息塊的形式追加消息到log中的,consumer在查詢的時候也是一次查詢大量的線性數據塊。消息集合即MessageSet,實現本身是一個非常簡單的API,它將一個字節數組或者文件進行打包。所以對消息的處理,這里沒有分開的序列化和反序列化的上步驟,消息的字段可以按需反序列化(如果沒有需要,可以不用反序列化)。
另一個影響效率的問題就是字節拷貝。為了解決字節拷貝的問題,kafka設計了一種“標准字節消息”,Producer、Broker、Consumer共享這一種消息格式。Kakfa的message log在broker端就是一些目錄文件,這些日志文件都是MessageSet按照這種“標准字節消息”格式寫入到磁盤的。
維持這種通用的格式對這些操作的優化尤為重要:持久化log 塊的網絡傳輸。流行的unix操作系統提供了一種非常高效的途徑來實現頁面緩存和socket之間的數據傳遞。在Linux操作系統中,這種方式被稱作:sendfile system call(Java提供了訪問這個系統調用的方法:FileChannel.transferTo api)。

為了理解sendfile的影響,需要理解一般的將數據從文件傳到socket的路徑:

  1. 操作系統將數據從磁盤讀到內核空間的頁緩存中
  2. 應用將數據從內核空間讀到用戶空間的緩存中
  3. 應用將數據寫回內核空間的socket緩存中
  4. 操作系統將數據從socket緩存寫到網卡緩存中,以便將數據經網絡發出

這種操作方式明顯是非常低效的,這里有四次拷貝,兩次系統調用。如果使用sendfile,就可以避免兩次拷貝:操作系統將數據直接從頁緩存發送到網絡上。所以在這個優化的路徑中,只有最后一步將數據拷貝到網卡緩存中是需要的。
我們期望一個主題上有多個消費者是一種常見的應用場景。利用上述的zero-copy,數據只被拷貝到頁緩存一次,然后就可以在每次消費時被重得利用,而不需要將數據存在內存中,然后在每次讀的時候拷貝到內核空間中。這使得消息消費速度可以達到網絡連接的速度。這樣以來,通過頁面緩存和sendfile的結合使用,整個kafka集群幾乎都已以緩存的方式提供服務,而且即使下游的consumer很多,也不會對整個集群服務造成壓力。
關於sendfile和zero-copy,請參考:zero-copy

五、Kafka集群部署

5.1 集群部署

為了提高性能,推薦采用專用的服務器來部署kafka集群,盡量與hadoop集群分開,因為kafka依賴磁盤讀寫和大的頁面緩存,如果和hadoop共享節點的話會影響其使用頁面緩存的性能。
Kafka集群的大小需要根據硬件的配置、生產者消費者的並發數量、數據的副本個數、數據的保存時長綜合確定。
磁盤的吞吐量尤為重要,因為通常kafka的瓶頸就在磁盤上。
Kafka依賴於zookeeper,建議采用專用服務器來部署zookeeper集群,zookeeper集群的節點采用偶數個,一般建議用3、5、7個。注意zookeeper集群越大其讀寫性能越慢,因為zookeeper需要在節點之間同步數據。一個3節點的zookeeper集群允許一個節點失敗,一個5節點集群允許2個幾點失敗。

5.2 集群大小

有很多因素決定着kafka集群需要具備存儲能力的大小,最准確的衡量辦法就是模擬負載來測算一下,Kafka本身也提供了負載測試的工具。
如果不想通過模擬實驗來評估集群大小,最好的辦法就是根據硬盤的空間需求來推算。下面我就根據網絡和磁盤吞吐量需求來做一下估算。
我們做如下假設:

  • W:每秒寫多少MB
  • R :副本數
  • C :Consumer的數量

一般的來說,kafka集群瓶頸在於網絡和磁盤吞吐量,所以我們先評估一下集群的網絡和磁盤需求。
對於每條消息,每個副本都要寫一遍,所以整體寫的速度是W*R。讀數據的部分主要是集群內部各個副本從leader同步消息讀和集群外部的consumer讀,所以集群內部讀的速率是(R-1)*W,同時,外部consumer讀的速度是C*W,因此:

  • Write:W*R
  • Read:(R-1)*W+C*W

需要注意的是,我們可以在讀的時候緩存部分數據來減少IO操作,如果一個集群有M MB內存,寫的速度是W MB/sec,則允許M/(W*R) 秒的寫可以被緩存。如果集群有32GB內存,寫的速度是50MB/s的話,則可以至少緩存10分鍾的數據。

5.3 Kafka性能測試

Performance testing

5.4 Kafka在zookeeper中的數據結構

Kafka data structures in Zookeeper

六、Kafka主要配置

6.1 Broker Config

屬性 默認值 描述
broker.id   必填參數,broker的唯一標識
log.dirs /tmp/kafka-logs Kafka數據存放的目錄。可以指定多個目錄,中間用逗號分隔,當新partition被創建的時會被存放到當前存放partition最少的目錄。
port 9092 BrokerServer接受客戶端連接的端口號
zookeeper.connect null Zookeeper的連接串,格式為:hostname1:port1,hostname2:port2,hostname3:port3。可以填一個或多個,為了提高可靠性,建議都填上。注意,此配置允許我們指定一個zookeeper路徑來存放此kafka集群的所有數據,為了與其他應用集群區分開,建議在此配置中指定本集群存放目錄,格式為:hostname1:port1,hostname2:port2,hostname3:port3/chroot/path 。需要注意的是,消費者的參數要和此參數一致。
message.max.bytes 1000000 服務器可以接收到的最大的消息大小。注意此參數要和consumer的maximum.message.size大小一致,否則會因為生產者生產的消息太大導致消費者無法消費。
num.io.threads 8 服務器用來執行讀寫請求的IO線程數,此參數的數量至少要等於服務器上磁盤的數量。
queued.max.requests 500 I/O線程可以處理請求的隊列大小,若實際請求數超過此大小,網絡線程將停止接收新的請求。
socket.send.buffer.bytes 100 * 1024 The SO_SNDBUFF buffer the server prefers for socket connections.
socket.receive.buffer.bytes 100 * 1024 The SO_RCVBUFF buffer the server prefers for socket connections.
socket.request.max.bytes 100 * 1024 * 1024 服務器允許請求的最大值, 用來防止內存溢出,其值應該小於 Java heap size.
num.partitions 1 默認partition數量,如果topic在創建時沒有指定partition數量,默認使用此值,建議改為5
log.segment.bytes 1024 * 1024 * 1024 Segment文件的大小,超過此值將會自動新建一個segment,此值可以被topic級別的參數覆蓋。
log.roll.{ms,hours} 24 * 7 hours 新建segment文件的時間,此值可以被topic級別的參數覆蓋。
log.retention.{ms,minutes,hours} 7 days Kafka segment log的保存周期,保存周期超過此時間日志就會被刪除。此參數可以被topic級別參數覆蓋。數據量大時,建議減小此值。
log.retention.bytes -1 每個partition的最大容量,若數據量超過此值,partition數據將會被刪除。注意這個參數控制的是每個partition而不是topic。此參數可以被log級別參數覆蓋。
log.retention.check.interval.ms 5 minutes 刪除策略的檢查周期
auto.create.topics.enable true 自動創建topic參數,建議此值設置為false,嚴格控制topic管理,防止生產者錯寫topic。
default.replication.factor 1 默認副本數量,建議改為2。
replica.lag.time.max.ms 10000 在此窗口時間內沒有收到follower的fetch請求,leader會將其從ISR(in-sync replicas)中移除。
replica.lag.max.messages 4000 如果replica節點落后leader節點此值大小的消息數量,leader節點就會將其從ISR中移除。
replica.socket.timeout.ms 30 * 1000 replica向leader發送請求的超時時間。
replica.socket.receive.buffer.bytes 64 * 1024 The socket receive buffer for network requests to the leader for replicating data.
replica.fetch.max.bytes 1024 * 1024 The number of byes of messages to attempt to fetch for each partition in the fetch requests the replicas send to the leader.
replica.fetch.wait.max.ms 500 The maximum amount of time to wait time for data to arrive on the leader in the fetch requests sent by the replicas to the leader.
num.replica.fetchers 1 Number of threads used to replicate messages from leaders. Increasing this value can increase the degree of I/O parallelism in the follower broker.
fetch.purgatory.purge.interval.requests 1000 The purge interval (in number of requests) of the fetch request purgatory.
zookeeper.session.timeout.ms 6000 ZooKeeper session 超時時間。如果在此時間內server沒有向zookeeper發送心跳,zookeeper就會認為此節點已掛掉。 此值太低導致節點容易被標記死亡;若太高,.會導致太遲發現節點死亡。
zookeeper.connection.timeout.ms 6000 客戶端連接zookeeper的超時時間。
zookeeper.sync.time.ms 2000 H ZK follower落后 ZK leader的時間。
controlled.shutdown.enable true 允許broker shutdown。如果啟用,broker在關閉自己之前會把它上面的所有leaders轉移到其它brokers上,建議啟用,增加集群穩定性。
auto.leader.rebalance.enable true If this is enabled the controller will automatically try to balance leadership for partitions among the brokers by periodically returning leadership to the “preferred” replica for each partition if it is available.
leader.imbalance.per.broker.percentage 10 The percentage of leader imbalance allowed per broker. The controller will rebalance leadership if this ratio goes above the configured value per broker.
leader.imbalance.check.interval.seconds 300 The frequency with which to check for leader imbalance.
offset.metadata.max.bytes 4096 The maximum amount of metadata to allow clients to save with their offsets.
connections.max.idle.ms 600000 Idle connections timeout: the server socket processor threads close the connections that idle more than this.
num.recovery.threads.per.data.dir 1 The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
unclean.leader.election.enable true Indicates whether to enable replicas not in the ISR set to be elected as leader as a last resort, even though doing so may result in data loss.
delete.topic.enable false 啟用deletetopic參數,建議設置為true。
offsets.topic.num.partitions 50 The number of partitions for the offset commit topic. Since changing this after deployment is currently unsupported, we recommend using a higher setting for production (e.g., 100-200).
offsets.topic.retention.minutes 1440 Offsets that are older than this age will be marked for deletion. The actual purge will occur when the log cleaner compacts the offsets topic.
offsets.retention.check.interval.ms 600000 The frequency at which the offset manager checks for stale offsets.
offsets.topic.replication.factor 3 The replication factor for the offset commit topic. A higher setting (e.g., three or four) is recommended in order to ensure higher availability. If the offsets topic is created when fewer brokers than the replication factor then the offsets topic will be created with fewer replicas.
offsets.topic.segment.bytes 104857600 Segment size for the offsets topic. Since it uses a compacted topic, this should be kept relatively low in order to facilitate faster log compaction and loads.
offsets.load.buffer.size 5242880 An offset load occurs when a broker becomes the offset manager for a set of consumer groups (i.e., when it becomes a leader for an offsets topic partition). This setting corresponds to the batch size (in bytes) to use when reading from the offsets segments when loading offsets into the offset manager’s cache.
offsets.commit.required.acks -1 The number of acknowledgements that are required before the offset commit can be accepted. This is similar to the producer’s acknowledgement setting. In general, the default should not be overridden.
offsets.commit.timeout.ms 5000 The offset commit will be delayed until this timeout or the required number of replicas have received the offset commit. This is similar to the producer request timeout.

6.2 Producer Config

屬性 默認值 描述
metadata.broker.list   啟動時producer查詢brokers的列表,可以是集群中所有brokers的一個子集。注意,這個參數只是用來獲取topic的元信息用,producer會從元信息中挑選合適的broker並與之建立socket連接。格式是:host1:port1,host2:port2。
request.required.acks 0 參見3.2節介紹
request.timeout.ms 10000 Broker等待ack的超時時間,若等待時間超過此值,會返回客戶端錯誤信息。
producer.type sync 同步異步模式。async表示異步,sync表示同步。如果設置成異步模式,可以允許生產者以batch的形式push數據,這樣會極大的提高broker性能,推薦設置為異步。
serializer.class kafka.serializer.DefaultEncoder 序列號類,.默認序列化成 byte[] 。
key.serializer.class   Key的序列化類,默認同上。
partitioner.class kafka.producer.DefaultPartitioner Partition類,默認對key進行hash。
compression.codec none 指定producer消息的壓縮格式,可選參數為: “none”, “gzip” and “snappy”。關於壓縮參見4.1節
compressed.topics null 啟用壓縮的topic名稱。若上面參數選擇了一個壓縮格式,那么壓縮僅對本參數指定的topic有效,若本參數為空,則對所有topic有效。
message.send.max.retries 3 Producer發送失敗時重試次數。若網絡出現問題,可能會導致不斷重試。
retry.backoff.ms 100 Before each retry, the producer refreshes the metadata of relevant topics to see if a new leader has been elected. Since leader election takes a bit of time, this property specifies the amount of time that the producer waits before refreshing the metadata.
topic.metadata.refresh.interval.ms 600 * 1000 The producer generally refreshes the topic metadata from brokers when there is a failure (partition missing, leader not available…). It will also poll regularly (default: every 10min so 600000ms). If you set this to a negative value, metadata will only get refreshed on failure. If you set this to zero, the metadata will get refreshed after each message sent (not recommended). Important note: the refresh happen only AFTER the message is sent, so if the producer never sends a message the metadata is never refreshed
queue.buffering.max.ms 5000 啟用異步模式時,producer緩存消息的時間。比如我們設置成1000時,它會緩存1秒的數據再一次發送出去,這樣可以極大的增加broker吞吐量,但也會造成時效性的降低。
queue.buffering.max.messages 10000 采用異步模式時producer buffer 隊列里最大緩存的消息數量,如果超過這個數值,producer就會阻塞或者丟掉消息。
queue.enqueue.timeout.ms -1 當達到上面參數值時producer阻塞等待的時間。如果值設置為0,buffer隊列滿時producer不會阻塞,消息直接被丟掉。若值設置為-1,producer會被阻塞,不會丟消息。
batch.num.messages 200 采用異步模式時,一個batch緩存的消息數量。達到這個數量值時producer才會發送消息。
send.buffer.bytes 100 * 1024 Socket write buffer size
client.id “” The client id is a user-specified string sent in each request to help trace calls. It should logically identify the application making the request.

6.3 Consumer Config

屬性 默認值 描述
group.id   Consumer的組ID,相同goup.id的consumer屬於同一個組。
zookeeper.connect   Consumer的zookeeper連接串,要和broker的配置一致。
consumer.id null 如果不設置會自動生成。
socket.timeout.ms 30 * 1000 網絡請求的socket超時時間。實際超時時間由max.fetch.wait + socket.timeout.ms 確定。
socket.receive.buffer.bytes 64 * 1024 The socket receive buffer for network requests.
fetch.message.max.bytes 1024 * 1024 查詢topic-partition時允許的最大消息大小。consumer會為每個partition緩存此大小的消息到內存,因此,這個參數可以控制consumer的內存使用量。這個值應該至少比server允許的最大消息大小大,以免producer發送的消息大於consumer允許的消息。
num.consumer.fetchers 1 The number fetcher threads used to fetch data.
auto.commit.enable true 如果此值設置為true,consumer會周期性的把當前消費的offset值保存到zookeeper。當consumer失敗重啟之后將會使用此值作為新開始消費的值。
auto.commit.interval.ms 60 * 1000 Consumer提交offset值到zookeeper的周期。
queued.max.message.chunks 2 用來被consumer消費的message chunks 數量, 每個chunk可以緩存fetch.message.max.bytes大小的數據量。
rebalance.max.retries 4 When a new consumer joins a consumer group the set of consumers attempt to “rebalance” the load to assign partitions to each consumer. If the set of consumers changes while this assignment is taking place the rebalance will fail and retry. This setting controls the maximum number of attempts before giving up.
fetch.min.bytes 1 The minimum amount of data the server should return for a fetch request. If insufficient data is available the request will wait for that much data to accumulate before answering the request.
fetch.wait.max.ms 100 The maximum amount of time the server will block before answering the fetch request if there isn’t sufficient data to immediately satisfy fetch.min.bytes.
rebalance.backoff.ms 2000 Backoff time between retries during rebalance.
refresh.leader.backoff.ms 200 Backoff time to wait before trying to determine the leader of a partition that has just lost its leader.
auto.offset.reset largest What to do when there is no initial offset in ZooKeeper or if an offset is out of range ;smallest : automatically reset the offset to the smallest offset; largest : automatically reset the offset to the largest offset;anything else: throw exception to the consumer
consumer.timeout.ms -1 若在指定時間內沒有消息消費,consumer將會拋出異常。
exclude.internal.topics true Whether messages from internal topics (such as offsets) should be exposed to the consumer.
zookeeper.session.timeout.ms 6000 ZooKeeper session timeout. If the consumer fails to heartbeat to ZooKeeper for this period of time it is considered dead and a rebalance will occur.
zookeeper.connection.timeout.ms 6000 The max time that the client waits while establishing a connection to zookeeper.
zookeeper.sync.time.ms 2000 How far a ZK follower can be behind a ZK leader

6.4 Topic 級別的配置

topic-config

 

 

 

 

 

 

 

Kafka史上最詳細原理總結

Kafka
Kafka是最初由Linkedin公司開發,是一個分布式、支持分區的(partition)、多副本的(replica),基於zookeeper協調的分布式消息系統,它的最大的特性就是可以實時的處理大量數據以滿足各種需求場景:比如基於hadoop的批處理系統、低延遲的實時系統、storm/Spark流式處理引擎,web/nginx日志、訪問日志,消息服務等等,用scala語言編寫,Linkedin於2010年貢獻給了Apache基金會並成為頂級開源 項目。
 
1.前言
消息隊列的性能好壞,其文件存儲機制設計是衡量一個消息隊列服務技術水平和最關鍵指標之一。下面將從Kafka文件存儲機制和物理結構角度,分析Kafka是如何實現高效文件存儲,及實際應用效果。
 
 1.1  Kafka的特性:
- 高吞吐量、低延遲:kafka每秒可以處理幾十萬條消息,它的延遲最低只有幾毫秒,每個topic可以分多個partition, consumer group 對partition進行consume操作。
- 可擴展性:kafka集群支持熱擴展
- 持久性、可靠性:消息被持久化到本地磁盤,並且支持數據備份防止數據丟失
- 容錯性:允許集群中節點失敗(若副本數量為n,則允許n-1個節點失敗)
- 高並發:支持數千個客戶端同時讀寫
 
1.2   Kafka的使用場景:
- 日志收集:一個公司可以用Kafka可以收集各種服務的log,通過kafka以統一接口服務的方式開放給各種consumer,例如hadoop、Hbase、Solr等。
- 消息系統:解耦和生產者和消費者、緩存消息等。
- 用戶活動跟蹤:Kafka經常被用來記錄web用戶或者app用戶的各種活動,如瀏覽網頁、搜索、點擊等活動,這些活動信息被各個服務器發布到kafka的topic中,然后訂閱者通過訂閱這些topic來做實時的監控分析,或者裝載到hadoop、數據倉庫中做離線分析和挖掘。
- 運營指標:Kafka也經常用來記錄運營監控數據。包括收集各種分布式應用的數據,生產各種操作的集中反饋,比如報警和報告。
- 流式處理:比如spark streaming和storm
- 事件源
 
1.3  Kakfa的設計思想
- Kakfa Broker Leader的選舉: Kakfa Broker集群受Zookeeper管理。所有的Kafka Broker節點一起去Zookeeper上注冊一個臨時節點,因為只有一個Kafka Broker會注冊成功,其他的都會失敗,所以這個成功在Zookeeper上注冊臨時節點的這個Kafka Broker會成為Kafka Broker Controller,其他的Kafka broker叫Kafka Broker follower。(這個過程叫Controller在ZooKeeper注冊Watch)。這個Controller會監聽其他的Kafka Broker的所有信息,如果這個kafka broker controller宕機了,在zookeeper上面的那個臨時節點就會消失,此時所有的kafka broker又會一起去 Zookeeper上注冊一個臨時節點,因為只有一個Kafka Broker會注冊成功,其他的都會失敗,所以這個成功在Zookeeper上注冊臨時節點的這個Kafka Broker會成為Kafka Broker Controller,其他的Kafka broker叫Kafka Broker follower 。例如:一旦有一個broker宕機了,這個kafka broker controller會讀取該宕機broker上所有的partition在zookeeper上的狀態,並選取ISR列表中的一個replica作為partition leader(如果ISR列表中的replica全掛,選一個幸存的replica作為leader; 如果該partition的所有的replica都宕機了,則將新的leader設置為-1,等待恢復,等待ISR中的任一個Replica“活”過來,並且選它作為Leader;或選擇第一個“活”過來的Replica(不一定是ISR中的)作為Leader),這個broker宕機的事情,kafka controller也會通知zookeeper,zookeeper就會通知其他的kafka broker。
這里曾經發生過一個bug,TalkingData使用Kafka0.8.1的時候,kafka controller在Zookeeper上注冊成功后,它和Zookeeper通信的timeout時間是6s,也就是如果kafka controller如果有6s中沒有和Zookeeper做心跳,那么Zookeeper就認為這個kafka controller已經死了,就會在Zookeeper上把這個臨時節點刪掉,那么其他Kafka就會認為controller已經沒了,就會再次搶着注冊臨時節點,注冊成功的那個kafka broker成為controller,然后,之前的那個kafka controller就需要各種shut down去關閉各種節點和事件的監聽。但是當kafka的讀寫流量都非常巨大的時候,TalkingData的一個bug是,由於網絡等原因,kafka controller和Zookeeper有6s中沒有通信,於是重新選舉出了一個新的kafka controller,但是原來的controller在shut down的時候總是不成功,這個時候producer進來的message由於Kafka集群中存在兩個kafka controller而無法落地。導致數據淤積。
這里曾經還有一個bug,TalkingData使用Kafka0.8.1的時候,當ack=0的時候,表示producer發送出去message,只要對應的kafka broker topic partition leader接收到的這條message,producer就返回成功,不管partition leader 是否真的成功把message真正存到kafka。當ack=1的時候,表示producer發送出去message,同步的把message存到對應topic的partition的leader上,然后producer就返回成功,partition leader異步的把message同步到其他partition replica上。當ack=all或-1,表示producer發送出去message,同步的把message存到對應topic的partition的leader和對應的replica上之后,才返回成功。但是如果某個kafka controller 切換的時候,會導致partition leader的切換(老的kafka controller上面的partition leader會選舉到其他的kafka broker上),但是這樣就會導致丟數據。
-  Consumergroup:各個consumer(consumer 線程)可以組成一個組(Consumer group ),partition中的每個message只能被 組(Consumer group ) 中的一個consumer( consumer 線程 )消費,如果一個message可以被多個 consumer( consumer 線程 ) 消費的話,那么這些consumer必須在不同的組。Kafka不支持一個partition中的message由兩個或兩個以上的同一個consumer group下的consumer thread來處理,除非再啟動一個新的consumer group。所以如果想同時對一個topic做消費的話,啟動多個consumer group就可以了,但是要注意的是,這里的多個consumer的消費都必須是順序讀取partition里面的message,新啟動的consumer默認從partition隊列最頭端最新的地方開始阻塞的讀message。它不能像AMQ那樣可以多個BET作為consumer去互斥的(for update悲觀鎖)並發處理message,這是因為多個BET去消費一個Queue中的數據的時候,由於要保證不能多個線程拿同一條message,所以就需要行級別悲觀所(for update),這就導致了consume的性能下降,吞吐量不夠。而kafka為了保證吞吐量,只允許同一個consumer group下的一個consumer線程去訪問一個partition。如果覺得效率不高的時候,可以加partition的數量來橫向擴展,那么再加新的consumer thread去消費。如果想多個不同的業務都需要這個topic的數據,起多個consumer group就好了,大家都是順序的讀取message,offsite的值互不影響。這樣沒有鎖競爭,充分發揮了橫向的擴展性,吞吐量極高。這也就形成了分布式消費的概念。
    當啟動一個consumer group去消費一個topic的時候,無論topic里面有多個少個partition,無論我們consumer group里面配置了多少個consumer thread,這個consumer group下面的所有consumer thread一定會消費全部的partition;即便這個consumer group下只有一個consumer thread,那么這個consumer thread也會去消費所有的partition。因此,最優的設計就是,consumer group下的consumer thread的數量等於partition數量,這樣效率是最高的。
    同一partition的一條message只能被同一個Consumer Group內的一個Consumer消費。不能夠一個consumer group的多個consumer同時消費一個partition。
    一個consumer group下,無論有多少個consumer,這個consumer group一定回去把這個topic下所有的partition都消費了。當consumer group里面的consumer數量小於這個topic下的partition數量的時候,如下圖groupA,groupB,就會出現一個conusmer thread消費多個partition的情況,總之是這個topic下的partition都會被消費。如果 consumer group里面的consumer數量等於這個topic下的partition數量的時候,如下圖groupC,此時效率是最高的,每個partition都有一個consumer thread去消費。當 consumer group里面的consumer數量大於這個topic下的partition數量的時候,如下圖GroupD,就會有一個consumer thread空閑。因此,我們在設定consumer group的時候,只需要指明里面有幾個consumer數量即可,無需指定對應的消費partition序號,consumer會自動進行rebalance。
    多個Consumer Group下的consumer可以消費同一條message,但是這種消費也是以o(1)的方式順序的讀取message去消費,,所以一定會重復消費這批message的,不能向AMQ那樣多個BET作為consumer消費(對message加鎖,消費的時候不能重復消費message)
- Consumer Rebalance的觸發條件:(1)Consumer增加或刪除會觸發 Consumer Group的Rebalance (2)Broker的增加或者減少都會觸發 Consumer Rebalance
- Consumer: Consumer處理partition里面的message的時候是o(1)順序讀取的。所以必須維護着上一次讀到哪里的offsite信息。high level API,offset存於Zookeeper中,low level API的offset由自己維護。一般來說都是使用high level api的。Consumer的delivery gurarantee,默認是讀完message先commmit再處理message,autocommit默認是true,這時候先commit就會更新offsite+1,一旦處理失敗,offsite已經+1,這個時候就會丟message;也可以配置成讀完消息處理再commit,這種情況下consumer端的響應就會比較慢的,需要等處理完才行。
一般情況下,一定是一個consumer group處理一個topic的message。Best Practice是這個consumer group里面consumer的數量等於topic里面partition的數量,這樣效率是最高的,一個consumer thread處理一個partition。如果這個consumer group里面consumer的數量小於topic里面partition的數量,就會有consumer thread同時處理多個partition(這個是kafka自動的機制,我們不用指定),但是總之這個topic里面的所有partition都會被處理到的。。如果這個consumer group里面consumer的數量大於topic里面partition的數量,多出的consumer thread就會閑着啥也不干,剩下的是一個consumer thread處理一個partition,這就造成了資源的浪費,因為一個partition不可能被兩個consumer thread去處理。 所以我們線上的分布式多個service服務,每個service里面的kafka consumer數量都小於對應的topic的partition數量,但是所有服務的consumer數量只和等於partition的數量,這是因為分布式service服務的所有consumer都來自一個consumer group,如果來自不同的consumer group就會處理重復的message了(同一個consumer group下的consumer不能處理同一個partition,不同的consumer group可以處理同一個topic,那么都是順序處理message,一定會處理重復的。一般這種情況都是兩個不同的業務邏輯,才會啟動兩個consumer group來處理一個topic)。
 
如果producer的流量增大,當前的topic的parition數量=consumer數量,這時候的應對方式就是很想擴展:增加topic下的partition,同時增加這個consumer group下的consumer。
                 
- Delivery Mode : Kafka producer 發送message不用維護message的offsite信息,因為這個時候,offsite就相當於一個自增id,producer就盡管發送message就好了。而且Kafka與AMQ不同,AMQ大都用在處理業務邏輯上,而Kafka大都是日志,所以Kafka的producer一般都是大批量的batch發送message,向這個topic一次性發送一大批message,load balance到一個partition上,一起插進去,offsite作為自增id自己增加就好。但是Consumer端是需要維護這個partition當前消費到哪個message的offsite信息的,這個offsite信息,high level api是維護在Zookeeper上,low level api是自己的程序維護。(Kafka管理界面上只能顯示high level api的consumer部分,因為low level api的partition offsite信息是程序自己維護,kafka是不知道的,無法在管理界面上展示 )當使用high level api的時候,先拿message處理,再定時自動commit offsite+1(也可以改成手動), 並且kakfa處理message是沒有鎖操作的。因此如果處理message失敗,此時還沒有commit offsite+1,當consumer thread重啟后會重復消費這個message。但是作為高吞吐量高並發的實時處理系統,at least once的情況下,至少一次會被處理到,是可以容忍的。如果無法容忍,就得使用low level api來自己程序維護這個offsite信息,那么想什么時候commit offsite+1就自己搞定了。
 
- Topic & Partition:Topic相當於傳統消息系統MQ中的一個隊列queue,producer端發送的message必須指定是發送到哪個topic,但是不需要指定topic下的哪個partition,因為kafka會把收到的message進行load balance,均勻的分布在這個topic下的不同的partition上( hash(message) % [broker數量]  )。物理上存儲上,這個topic會分成一個或多個partition,每個partiton相當於是一個子queue。在物理結構上,每個partition對應一個物理的目錄(文件夾),文件夾命名是[topicname]_[partition]_[序號],一個topic可以有無數多的partition,根據業務需求和數據量來設置。在kafka配置文件中可隨時更高num.partitions參數來配置更改topic的partition數量,在 創建Topic時通過參數指定parittion數量。 Topic創建之后通過Kafka提供的工具也可以修改partiton數量。
   一般來說,(1)一個Topic的Partition數量大於等於Broker的數量, 可以提高吞吐率。(2)同一個Partition的Replica盡量分散到不同的機器, 高可用。
  當add a new partition的時候,partition里面的message不會重新進行分配,原來的partition里面的message數據不會變,新加的這個partition剛開始是空的,隨后進入這個topic的message就會重新參與所有partition的load balance
- Partition Replica:每個partition可以在其他的kafka broker節點上存副本,以便某個kafka broker節點宕機不會影響這個kafka集群。存replica副本的方式是按照kafka broker的順序存。例如有5個kafka broker節點,某個topic有3個partition,每個partition存2個副本,那么partition1存broker1,broker2,partition2存broker2,broker3。。。以此類推(replica副本數目不能大於kafka broker節點的數目,否則報錯。這里的replica數其實就是partition的副本總數,其中包括一個leader,其他的就是copy副本)。這樣如果某個broker宕機,其實整個kafka內數據依然是完整的。但是,replica副本數越高,系統雖然越穩定,但是回來帶資源和性能上的下降;replica副本少的話,也會造成系統丟數據的風險。
  (1)怎樣傳送消息:producer先把message發送到partition leader,再由leader發送給其他partition follower。(如果讓producer發送給每個replica那就太慢了)
  (2) 在向Producer發送ACK前需要保證有多少個Replica已經收到該消息:根據ack配的個數而定
  (3) 怎樣處理某個Replica不工作的情況:如果這個部工作的partition replica不在ack列表中,就是producer在發送消息到partition leader上,partition leader向partition follower發送message沒有響應而已,這個不會影響整個系統,也不會有什么問題。如果這個不工作的partition replica在ack列表中的話,producer發送的message的時候會等待這個不工作的partition replca寫message成功,但是會等到time out,然后返回失敗因為某個ack列表中的partition replica沒有響應,此時kafka會自動的把這個部工作的partition replica從ack列表中移除,以后的producer發送message的時候就不會有這個ack列表下的這個部工作的partition replica了。 
  (4) 怎樣處理Failed Replica恢復回來的情況:如果這個partition replica之前不在ack列表中,那么啟動后重新受Zookeeper管理即可,之后producer發送message的時候,partition leader會繼續發送message到這個partition follower上。如果這個 partition replica之前在ack列表中,此時重啟后,需要把這個partition replica再手動加到ack列表中。(ack列表是手動添加的,出現某個部工作的partition replica的時候自動從ack列表中移除的)
- Partition leader與follower:partition也有leader和follower之分。leader是主partition,producer寫kafka的時候先寫partition leader,再由partition leader push給其他的partition follower。partition leader與follower的信息受Zookeeper控制,一旦partition leader所在的broker節點宕機,zookeeper會沖其他的broker的partition follower上選擇follower變為parition leader。
- Topic分配partition和partition replica的算法:(1)將Broker(size=n)和待分配的Partition排序。(2) 將第i個Partition分配到第(i%n)個Broker上。(3) 將第i個Partition的第j個Replica分配到第((i + j) % n)個Broker上
 
- 消息投遞可靠性
一個消息如何算投遞成功,Kafka提供了三種模式:
- 第一種是啥都不管,發送出去就當作成功,這種情況當然不能保證消息成功投遞到broker;
- 第二種是Master-Slave模型,只有當Master和所有Slave都接收到消息時,才算投遞成功,這種模型提供了最高的投遞可靠性,但是損傷了性能;
- 第三種模型,即只要Master確認收到消息就算投遞成功;實際使用時,根據應用特性選擇,絕大多數情況下都會中和可靠性和性能選擇第三種模型
  消息在broker上的可靠性,因為消息會持久化到磁盤上,所以如果正常stop一個broker,其上的數據不會丟失;但是如果不正常stop,可能會使存在頁面緩存來不及寫入磁盤的消息丟失,這可以通過配置flush頁面緩存的周期、閾值緩解,但是同樣會頻繁的寫磁盤會影響性能,又是一個選擇題,根據實際情況配置。
  消息消費的可靠性,Kafka提供的是“At least once”模型,因為消息的讀取進度由offset提供,offset可以由消費者自己維護也可以維護在zookeeper里,但是當消息消費后consumer掛掉,offset沒有即時寫回,就有可能發生重復讀的情況,這種情況同樣可以通過調整commit offset周期、閾值緩解,甚至消費者自己把消費和commit offset做成一個事務解決,但是如果你的應用不在乎重復消費,那就干脆不要解決,以換取最大的性能。
 
- Partition ack:當ack=1,表示producer寫partition leader成功后,broker就返回成功,無論其他的partition follower是否寫成功。當ack=2,表示producer寫partition leader和其他一個follower成功的時候, broker就返回成功,無論其他的partition follower是否寫成功。當ack=-1 [parition的數量]的時候,表示只有producer全部寫成功的時候,才算成功,kafka broker才返回成功信息。這里需要注意的是,如果ack=1的時候,一旦有個broker宕機導致partition的follower和leader切換,會導致丟數據。
   
- message狀態 :在Kafka中,消息的狀態被保存在consumer中,broker不會關心哪個消息被消費了被誰消費了,只記錄一個offset值(指向partition中下一個要被消費的消息位置),這就意味着如果consumer處理不好的話,broker上的一個消息可能會被消費多次。
- message持久化:Kafka中會把消息持久化到本地文件系統中,並且保持o(1)極高的效率。我們眾所周知IO讀取是非常耗資源的性能也是最慢的,這就是為了數據庫的瓶頸經常在IO上,需要換SSD硬盤的原因。但是Kafka作為吞吐量極高的MQ,卻可以非常高效的message持久化到文件。這是因為Kafka是順序寫入o(1)的時間復雜度,速度非常快。也是高吞吐量的原因。由於message的寫入持久化是順序寫入的,因此message在被消費的時候也是按順序被消費的,保證partition的message是順序消費的。一般的機器,單機每秒100k條數據。
- message有效期:Kafka會長久保留其中的消息,以便consumer可以多次消費,當然其中很多細節是可配置的。
- Produer : Producer向Topic發送message,不需要指定partition,直接發送就好了。kafka通過partition ack來控制是否發送成功並把信息返回給producer,producer可以有任意多的thread,這些kafka服務器端是不care的。Producer端的delivery guarantee默認是At least once的。也可以設置Producer異步發送實現At most once。Producer可以用主鍵冪等性實現Exactly once
- Kafka高吞吐量: Kafka的高吞吐量體現在讀寫上,分布式並發的讀和寫都非常快,寫的性能體現在以o(1)的時間復雜度進行順序寫入。讀的性能 體現在以o(1)的時間復雜度進行順序讀取, 對topic進行partition分區,consume group中的consume線程可以以很高能性能進行順序讀。
- Kafka delivery guarantee(message傳送保證):(1)At most once消息可能會丟,絕對不會重復傳輸;(2)At least once 消息絕對不會丟,但是可能會重復傳輸;(3)Exactly once每條信息肯定會被傳輸一次且僅傳輸一次,這是用戶想要的。
- 批量發送:Kafka支持以消息集合為單位進行批量發送,以提高push效率。
- push-and-pull : Kafka中的Producer和consumer采用的是push-and-pull模式,即Producer只管向broker push消息,consumer只管從broker pull消息,兩者對消息的生產和消費是異步的。
- Kafka集群中broker之間的關系:不是主從關系,各個broker在集群中地位一樣,我們可以隨意的增加或刪除任何一個broker節點。
- 負載均衡方面: Kafka提供了一個 metadata API來管理broker之間的負載(對Kafka0.8.x而言,對於0.7.x主要靠zookeeper來實現負載均衡)。
- 同步異步:Producer采用異步push方式,極大提高Kafka系統的吞吐率(可以通過參數控制是采用同步還是異步方式)。
- 分區機制partition:Kafka的broker端支持消息分區partition,Producer可以決定把消息發到哪個partition,在一個 partition 中message的順序就是Producer發送消息的順序,一個topic中可以有多個partition,具體partition的數量是可配置的。partition的概念使得kafka作為MQ可以橫向擴展,吞吐量巨大。partition可以設置replica副本,replica副本存在不同的kafka broker節點上,第一個partition是leader,其他的是follower,message先寫到partition leader上,再由partition leader push到parition follower上。所以說kafka可以水平擴展,也就是擴展partition。
- 離線數據裝載:Kafka由於對可拓展的數據持久化的支持,它也非常適合向Hadoop或者數據倉庫中進行數據裝載。
- 實時數據與離線數據:kafka既支持離線數據也支持實時數據,因為kafka的message持久化到文件,並可以設置有效期,因此可以把kafka作為一個高效的存儲來使用,可以作為離線數據供后面的分析。當然作為分布式實時消息系統,大多數情況下還是用於實時的數據處理的,但是當cosumer消費能力下降的時候可以通過message的持久化在淤積數據在kafka。
- 插件支持:現在不少活躍的社區已經開發出不少插件來拓展Kafka的功能,如用來配合Storm、Hadoop、flume相關的插件。
- 解耦:  相當於一個MQ,使得Producer和Consumer之間異步的操作,系統之間解耦
- 冗余:  replica有多個副本,保證一個broker node宕機后不會影響整個服務
- 擴展性:  broker節點可以水平擴展,partition也可以水平增加,partition replica也可以水平增加
- 峰值:  在訪問量劇增的情況下,kafka水平擴展, 應用仍然需要繼續發揮作用
- 可恢復性:  系統的一部分組件失效時,由於有partition的replica副本,不會影響到整個系統。
- 順序保證性:由於kafka的producer的寫message與consumer去讀message都是順序的讀寫,保證了高效的性能。
- 緩沖:由於producer那面可能業務很簡單,而后端consumer業務會很復雜並有數據庫的操作,因此肯定是producer會比consumer處理速度快,如果沒有kafka,producer直接調用consumer,那么就會造成整個系統的處理速度慢,加一層kafka作為MQ,可以起到緩沖的作用。
- 異步通信:作為MQ,Producer與Consumer異步通信

2.Kafka文件存儲機制

2.1 Kafka部分名詞解釋如下:
 
     Kafka中發布訂閱的對象是topic。我們可以為每類數據創建一個topic,把向topic發布消息的客戶端稱作producer,從topic訂閱消息的客戶端稱作consumer。Producers和consumers可以同時從多個topic讀寫數據。一個kafka集群由一個或多個broker服務器組成,它負責持久化和備份具體的kafka消息。
  • Broker:Kafka節點,一個Kafka節點就是一個broker,多個broker可以組成一個Kafka集群。
  • Topic:一類消息,消息存放的目錄即主題,例如page view日志、click日志等都可以以topic的形式存在,Kafka集群能夠同時負責多個topic的分發。
  • Partition:topic物理上的分組,一個topic可以分為多個partition,每個partition是一個有序的隊列
  • Segment:partition物理上由多個segment組成,每個Segment存着message信息
  • Producer : 生產message發送到topic
  • Consumer : 訂閱topic消費message, consumer作為一個線程來消費
  • Consumer Group:一個Consumer Group包含多個consumer, 這個是預先在配置文件中配置好的。各個consumer(consumer 線程)可以組成一個組(Consumer group ),partition中的每個message只能被組(Consumer group ) 中的一個consumer(consumer 線程 )消費,如果一個message可以被多個consumer(consumer 線程 ) 消費的話,那么這些consumer必須在不同的組。Kafka不支持一個partition中的message由兩個或兩個以上的consumer thread來處理, 除非來自不同的consumer group。它不能像AMQ那樣可以多個BET作為consumer去處理message,這是因為多個BET去消費一個Queue中的數據的時候,由於要保證不能多個線程拿同一條message,所以就需要行級別悲觀所(for update),這就導致了consume的性能下降,吞吐量不夠。而kafka為了保證吞吐量,只允許一個consumer線程去訪問一個partition。如果覺得效率不高的時候,可以加partition的數量來橫向擴展,那么再加新的consumer thread去消費。這樣沒有鎖競爭,充分發揮了橫向的擴展性,吞吐量極高。這也就形成了分布式消費的概念。

  • 2.2 kafka一些原理概念
1.持久化
kafka使用文件存儲消息(append only log),這就直接決定kafka在性能上嚴重依賴文件系統的本身特性.且無論任何OS下,對文件系統本身的優化是非常艱難的.文件緩存/直接內存映射等是常用的手段.因為kafka是對日志文件進行append操作,因此磁盤檢索的開支是較小的;同時為了減少磁盤寫入的次數,broker會將消息暫時buffer起來,當消息的個數(或尺寸)達到一定閥值時,再flush到磁盤,這樣減少了磁盤IO調用的次數.對於kafka而言,較高性能的磁盤,將會帶來更加直接的性能提升.
 
2.性能
除磁盤IO之外,我們還需要考慮網絡IO,這直接關系到kafka的吞吐量問題.kafka並沒有提供太多高超的技巧;對於producer端,可以將消息buffer起來,當消息的條數達到一定閥值時,批量發送給broker;對於consumer端也是一樣,批量fetch多條消息.不過消息量的大小可以通過配置文件來指定.對於kafka broker端,似乎有個sendfile系統調用可以潛在的提升網絡IO的性能:將文件的數據映射到系統內存中,socket直接讀取相應的內存區域即可,而無需進程再次copy和交換(這里涉及到"磁盤IO數據"/"內核內存"/"進程內存"/"網絡緩沖區",多者之間的數據copy).
其實對於producer/consumer/broker三者而言,CPU的開支應該都不大,因此啟用消息壓縮機制是一個良好的策略;壓縮需要消耗少量的CPU資源,不過對於kafka而言,網絡IO更應該需要考慮.可以將任何在網絡上傳輸的消息都經過壓縮.kafka支持gzip/snappy等多種壓縮方式.
 
3.負載均衡
kafka集群中的任何一個broker,都可以向producer提供metadata信息,這些metadata中包含"集群中存活的servers列表"/"partitions leader列表"等信息(請參看zookeeper中的節點信息). 當producer獲取到metadata信息之后, producer將會和Topic下所有partition leader保持socket連接;消息由producer直接通過socket發送到broker,中間不會經過任何"路由層".
異步發送,將多條消息暫且在客戶端buffer起來,並將他們批量發送到broker;小數據IO太多,會拖慢整體的網絡延遲,批量延遲發送事實上提升了網絡效率;不過這也有一定的隱患,比如當producer失效時,那些尚未發送的消息將會丟失。
 
4.Topic模型
其他JMS實現,消息消費的位置是有prodiver保留,以便避免重復發送消息或者將沒有消費成功的消息重發等,同時還要控制消息的狀態.這就要求JMS broker需要太多額外的工作.在kafka中,partition中的消息只有一個consumer在消費,且不存在消息狀態的控制,也沒有復雜的消息確認機制,可見kafka broker端是相當輕量級的.當消息被consumer接收之后,consumer可以在本地保存最后消息的offset,並間歇性的向zookeeper注冊offset.由此可見,consumer客戶端也很輕量級。
kafka中consumer負責維護消息的消費記錄,而broker則不關心這些,這種設計不僅提高了consumer端的靈活性,也適度的減輕了broker端設計的復雜度;這是和眾多JMS prodiver的區別.此外,kafka中消息ACK的設計也和JMS有很大不同,kafka中的消息是批量(通常以消息的條數或者chunk的尺寸為單位)發送給consumer,當消息消費成功后,向zookeeper提交消息的offset,而不會向broker交付ACK.或許你已經意識到,這種"寬松"的設計,將會有"丟失"消息/"消息重發"的危險.
 
5.消息傳輸一致
Kafka提供3種消息傳輸一致性語義:最多1次,最少1次,恰好1次。
最少1次:可能會重傳數據,有可能出現數據被重復處理的情況;
最多1次:可能會出現數據丟失情況;
恰好1次:並不是指真正只傳輸1次,只不過有一個機制。確保不會出現“數據被重復處理”和“數據丟失”的情況。
 
at most once: 消費者fetch消息,然后保存offset,然后處理消息;當client保存offset之后,但是在消息處理過程中consumer進程失效(crash),導致部分消息未能繼續處理.那么此后可能其他consumer會接管,但是因為offset已經提前保存,那么新的consumer將不能fetch到offset之前的消息(盡管它們尚沒有被處理),這就是"at most once".
at least once: 消費者fetch消息,然后處理消息,然后保存offset.如果消息處理成功之后,但是在保存offset階段zookeeper異常或者consumer失效,導致保存offset操作未能執行成功,這就導致接下來再次fetch時可能獲得上次已經處理過的消息,這就是"at least once".
"Kafka Cluster"到消費者的場景中可以采取以下方案來得到“恰好1次”的一致性語義:
最少1次+消費者的輸出中額外增加已處理消息最大編號:由於已處理消息最大編號的存在,不會出現重復處理消息的情況。
 
6.副本
kafka中,replication策略是基於partition,而不是topic;kafka將每個partition數據復制到多個server上,任何一個partition有一個leader和多個follower(可以沒有);備份的個數可以通過broker配置文件來設定。leader處理所有的read-write請求,follower需要和leader保持同步.Follower就像一個"consumer",消費消息並保存在本地日志中;leader負責跟蹤所有的follower狀態,如果follower"落后"太多或者失效,leader將會把它從replicas同步列表中刪除.當所有的follower都將一條消息保存成功,此消息才被認為是"committed",那么此時consumer才能消費它,這種同步策略,就要求follower和leader之間必須具有良好的網絡環境.即使只有一個replicas實例存活,仍然可以保證消息的正常發送和接收,只要zookeeper集群存活即可.
選擇follower時需要兼顧一個問題,就是新leader server上所已經承載的partition leader的個數,如果一個server上有過多的partition leader,意味着此server將承受着更多的IO壓力.在選舉新leader,需要考慮到"負載均衡",partition leader較少的broker將會更有可能成為新的leader.
 
7.log
每個log entry格式為"4個字節的數字N表示消息的長度" + "N個字節的消息內容";每個日志都有一個offset來唯一的標記一條消息,offset的值為8個字節的數字,表示此消息在此partition中所處的起始位置..每個partition在物理存儲層面,有多個log file組成(稱為segment).segment file的命名為"最小offset".kafka.例如"00000000000.kafka";其中"最小offset"表示此segment中起始消息的offset.
獲取消息時,需要指定offset和最大chunk尺寸,offset用來表示消息的起始位置,chunk size用來表示最大獲取消息的總長度(間接的表示消息的條數).根據offset,可以找到此消息所在segment文件,然后根據segment的最小offset取差值,得到它在file中的相對位置,直接讀取輸出即可.
 
8.分布式
kafka使用zookeeper來存儲一些meta信息,並使用了zookeeper watch機制來發現meta信息的變更並作出相應的動作(比如consumer失效,觸發負載均衡等)
Broker node registry: 當一個kafka broker啟動后,首先會向zookeeper注冊自己的節點信息(臨時znode),同時當broker和zookeeper斷開連接時,此znode也會被刪除.
Broker Topic Registry: 當一個broker啟動時,會向zookeeper注冊自己持有的topic和partitions信息,仍然是一個臨時znode.
Consumer and Consumer group: 每個consumer客戶端被創建時,會向zookeeper注冊自己的信息;此作用主要是為了"負載均衡".一個group中的多個consumer可以交錯的消費一個topic的所有partitions;簡而言之,保證此topic的所有partitions都能被此group所消費,且消費時為了性能考慮,讓partition相對均衡的分散到每個consumer上.
Consumer id Registry: 每個consumer都有一個唯一的ID(host:uuid,可以通過配置文件指定,也可以由系統生成),此id用來標記消費者信息.
Consumer offset Tracking: 用來跟蹤每個consumer目前所消費的partition中最大的offset.此znode為持久節點,可以看出offset跟group_id有關,以表明當group中一個消費者失效,其他consumer可以繼續消費.
Partition Owner registry: 用來標記partition正在被哪個consumer消費.臨時znode。此節點表達了"一個partition"只能被group下一個consumer消費,同時當group下某個consumer失效,那么將會觸發負載均衡(即:讓partitions在多個consumer間均衡消費,接管那些"游離"的partitions)
當consumer啟動時,所觸發的操作:
A) 首先進行"Consumer id Registry";
B) 然后在"Consumer id Registry"節點下注冊一個watch用來監聽當前group中其他consumer的"leave"和"join";只要此znode path下節點列表變更,都會觸發此group下consumer的負載均衡.(比如一個consumer失效,那么其他consumer接管partitions).
C) 在"Broker id registry"節點下,注冊一個watch用來監聽broker的存活情況;如果broker列表變更,將會觸發所有的groups下的consumer重新balance.
 
總結:
1) Producer端使用zookeeper用來"發現"broker列表,以及和Topic下每個partition leader建立socket連接並發送消息.
2) Broker端使用zookeeper用來注冊broker信息,已經監測partition leader存活性.
3) Consumer端使用zookeeper用來注冊consumer信息,其中包括consumer消費的partition列表等,同時也用來發現broker列表,並和partition leader建立socket連接,並獲取消息。
 
9.Leader的選擇
Kafka的核心是日志文件,日志文件在集群中的同步是分布式數據系統最基礎的要素。
如果leaders永遠不會down的話我們就不需要followers了!一旦leader down掉了,需要在followers中選擇一個新的leader.但是followers本身有可能延時太久或者crash,所以必須選擇高質量的follower作為leader.必須保證,一旦一個消息被提交了,但是leader down掉了,新選出的leader必須可以提供這條消息。大部分的分布式系統采用了多數投票法則選擇新的leader,對於多數投票法則,就是根據所有副本節點的狀況動態的選擇最適合的作為leader.Kafka並不是使用這種方法。
Kafka動態維護了一個同步狀態的副本的集合(a set of in-sync replicas),簡稱ISR,在這個集合中的節點都是和leader保持高度一致的,任何一條消息必須被這個集合中的每個節點讀取並追加到日志中了,才回通知外部這個消息已經被提交了。因此這個集合中的任何一個節點隨時都可以被選為leader.ISR在ZooKeeper中維護。ISR中有f+1個節點,就可以允許在f個節點down掉的情況下不會丟失消息並正常提供服。ISR的成員是動態的,如果一個節點被淘汰了,當它重新達到“同步中”的狀態時,他可以重新加入ISR.這種leader的選擇方式是非常快速的,適合kafka的應用場景。
一個邪惡的想法:如果所有節點都down掉了怎么辦?Kafka對於數據不會丟失的保證,是基於至少一個節點是存活的,一旦所有節點都down了,這個就不能保證了。
實際應用中,當所有的副本都down掉時,必須及時作出反應。可以有以下兩種選擇:
1. 等待ISR中的任何一個節點恢復並擔任leader。
2. 選擇所有節點中(不只是ISR)第一個恢復的節點作為leader.
這是一個在可用性和連續性之間的權衡。如果等待ISR中的節點恢復,一旦ISR中的節點起不起來或者數據都是了,那集群就永遠恢復不了了。如果等待ISR意外的節點恢復,這個節點的數據就會被作為線上數據,有可能和真實的數據有所出入,因為有些數據它可能還沒同步到。Kafka目前選擇了第二種策略,在未來的版本中將使這個策略的選擇可配置,可以根據場景靈活的選擇。
這種窘境不只Kafka會遇到,幾乎所有的分布式數據系統都會遇到。
 
10.副本管理
以上僅僅以一個topic一個分區為例子進行了討論,但實際上一個Kafka將會管理成千上萬的topic分區.Kafka盡量的使所有分區均勻的分布到集群所有的節點上而不是集中在某些節點上,另外主從關系也盡量均衡這樣每個幾點都會擔任一定比例的分區的leader.
優化leader的選擇過程也是很重要的,它決定了系統發生故障時的空窗期有多久。Kafka選擇一個節點作為“controller”,當發現有節點down掉的時候它負責在游泳分區的所有節點中選擇新的leader,這使得Kafka可以批量的高效的管理所有分區節點的主從關系。如果controller down掉了,活着的節點中的一個會備切換為新的controller.
 
11.Leader與副本同步
對於某個分區來說,保存正分區的"broker"為該分區的"leader",保存備份分區的"broker"為該分區的"follower"。備份分區會完全復制正分區的消息,包括消息的編號等附加屬性值。為了保持正分區和備份分區的內容一致,Kafka采取的方案是在保存備份分區的"broker"上開啟一個消費者進程進行消費,從而使得正分區的內容與備份分區的內容保持一致。一般情況下,一個分區有一個“正分區”和零到多個“備份分區”。可以配置“正分區+備份分區”的總數量,關於這個配置,不同主題可以有不同的配置值。注意,生產者,消費者只與保存正分區的"leader"進行通信。
 
Kafka允許topic的分區擁有若干副本,這個數量是可以配置的,你可以為每個topic配置副本的數量。Kafka會自動在每個副本上備份數據,所以當一個節點down掉時數據依然是可用的。
Kafka的副本功能不是必須的,你可以配置只有一個副本,這樣其實就相當於只有一份數據。
創建副本的單位是topic的分區,每個分區都有一個leader和零或多個followers.所有的讀寫操作都由leader處理,一般分區的數量都比broker的數量多的多,各分區的leader均勻的分布在brokers中。所有的followers都復制leader的日志,日志中的消息和順序都和leader中的一致。followers向普通的consumer那樣從leader那里拉取消息並保存在自己的日志文件中。
許多分布式的消息系統自動的處理失敗的請求,它們對一個節點是否着(alive)”有着清晰的定義。Kafka判斷一個節點是否活着有兩個條件:
1. 節點必須可以維護和ZooKeeper的連接,Zookeeper通過心跳機制檢查每個節點的連接。
2. 如果節點是個follower,他必須能及時的同步leader的寫操作,延時不能太久。
符合以上條件的節點准確的說應該是“同步中的(in sync)”,而不是模糊的說是“活着的”或是“失敗的”。Leader會追蹤所有“同步中”的節點,一旦一個down掉了,或是卡住了,或是延時太久,leader就會把它移除。至於延時多久算是“太久”,是由參數replica.lag.max.messages決定的,怎樣算是卡住了,怎是由參數replica.lag.time.max.ms決定的。
只有當消息被所有的副本加入到日志中時,才算是“committed”,只有committed的消息才會發送給consumer,這樣就不用擔心一旦leader down掉了消息會丟失。Producer也可以選擇是否等待消息被提交的通知,這個是由參數acks決定的。
Kafka保證只要有一個“同步中”的節點,“committed”的消息就不會丟失。
 
 
  • 2.3  kafka拓撲結構

       一個典型的Kafka集群中包含若干Producer(可以是web前端FET,或者是服務器日志等),若干broker(Kafka支持水平擴展,一般broker數量越多,集群吞吐率越高),若干ConsumerGroup,以及一個Zookeeper集群。Kafka通過Zookeeper管理Kafka集群配置:選舉Kafka broker的leader,以及在Consumer Group發生變化時進行rebalance,因為consumer消費kafka topic的partition的offsite信息是存在Zookeeper的。Producer使用push模式將消息發布到broker,Consumer使用pull模式從broker訂閱並消費消息。
 

分析過程分為以下4個步驟:

  • topic中partition存儲分布
  • partiton中文件存儲方式 (partition在linux服務器上就是一個目錄(文件夾))
  • partiton中segment文件存儲結構
  • 在partition中如何通過offset查找message

通過上述4過程詳細分析,我們就可以清楚認識到kafka文件存儲機制的奧秘。

 
2.3 topic中partition存儲分布

假設實驗環境中Kafka集群只有一個broker,xxx/message-folder為數據文件存儲根目錄,在Kafka broker中server.properties文件配置(參數log.dirs=xxx/message-folder),例如創建2個topic名 稱分別為report_push、launch_info, partitions數量都為partitions=4

存儲路徑和目錄規則為:

xxx/message-folder

  |--report_push-0
  |--report_push-1
  |--report_push-2
  |--report_push-3
  |--launch_info-0
  |--launch_info-1
  |--launch_info-2
  |--launch_info-3
 
在Kafka文件存儲中,同一個topic下有多個不同partition,每個partition為一個目錄,partiton命名規則為 topic名稱+有序序號,第一個partiton序號從0開始,序號最大值為partitions數量減1。
消息發送時都被發送到一個topic,其本質就是一個目錄,而topic由是由一些Partition組成,其組織結構如下圖所示:
 
我們可以看到,Partition是一個Queue的結構,每個Partition中的消息都是有序的,生產的消息被不斷追加到Partition上,其中的每一個消息都被賦予了一個唯一的offset值。
 
Kafka集群會保存所有的消息,不管消息有沒有被消費;我們可以設定消息的過期時間,只有過期的數據才會被自動清除以釋放磁盤空間。比如我們設置消息過期時間為2天,那么這2天內的所有消息都會被保存到集群中,數據只有超過了兩天才會被清除。
 
Kafka只維護在Partition中的offset值,因為這個offsite標識着這個partition的message消費到哪條了。Consumer每消費一個消息,offset就會加1。其實消息的狀態完全是由Consumer控制的,Consumer可以跟蹤和重設這個offset值,這樣的話Consumer就可以讀取任意位置的消息。
 
把消息日志以Partition的形式存放有多重考慮,第一,方便在集群中擴展,每個Partition可以通過調整以適應它所在的機器,而一個topic又可以有多個Partition組成,因此整個集群就可以適應任意大小的數據了;第二就是可以提高並發,因為可以以Partition為單位讀寫了。
 
通過上面介紹的我們可以知道,kafka中的數據是持久化的並且能夠容錯的。Kafka允許用戶為每個topic設置副本數量,副本數量決定了有幾個broker來存放寫入的數據。如果你的副本數量設置為3,那么一份數據就會被存放在3台不同的機器上,那么就允許有2個機器失敗。一般推薦副本數量至少為2,這樣就可以保證增減、重啟機器時不會影響到數據消費。如果對數據持久化有更高的要求,可以把副本數量設置為3或者更多。
 
Kafka中的topic是以partition的形式存放的,每一個topic都可以設置它的partition數量,Partition的數量決定了組成topic的message的數量。Producer在生產數據時,會按照一定規則(這個規則是可以自定義的)把消息發布到topic的各個partition中。上面將的副本都是以partition為單位的,不過只有一個partition的副本會被選舉成leader作為讀寫用。
 
關於如何設置partition值需要考慮的因素。一個partition只能被一個消費者消費(一個消費者可以同時消費多個partition),因此,如果設置的partition的數量小於consumer的數量,就會有消費者消費不到數據。所以,推薦partition的數量一定要大於同時運行的consumer的數量。另外一方面,建議partition的數量大於集群broker的數量,這樣leader partition就可以均勻的分布在各個broker中,最終使得集群負載均衡。在Cloudera,每個topic都有上百個partition。需要注意的是,kafka需要為每個partition分配一些內存來緩存消息數據,如果partition數量越大,就要為kafka分配更大的heap space。
2.4 partiton中文件存儲方式
 
  • 每個partion(目錄)相當於一個巨型文件被平均分配到多個大小相等segment(段)數據文件中。但每個段segment file消息數量不一定相等,這種特性方便old segment file快速被刪除。
  • 每個partiton只需要支持順序讀寫就行了,segment文件生命周期由服務端配置參數決定。

這樣做的好處就是能快速刪除無用文件,有效提高磁盤利用率。

2.5 partiton中segment文件存儲結構
producer發message到某個topic,message會被均勻的分布到多個partition上(隨機或根據用戶指定的回調函數進行分布),kafka broker收到message往對應partition的最后一個segment上添加該消息,當某個segment上的消息條數達到配置值或消息發布時間超過閾值時,segment上的消息會被flush到磁盤,只有flush到磁盤上的消息consumer才能消費,segment達到一定的大小后將不會再往該segment寫數據,broker會創建新的segment。
 
每個part在內存中對應一個index,記錄每個segment中的第一條消息偏移。
  • segment file組成:由2大部分組成,分別為index file和data file,此2個文件一一對應,成對出現,后綴".index"和“.log”分別表示為segment索引文件、數據文件.
  • segment文件命名規則:partion全局的第一個segment從0開始,后續每個segment文件名為上一個全局partion的最大offset(偏移message數)。數值最大為64位long大小,19位數字字符長度,沒有數字用0填充。
 
每個segment中存儲很多條消息,消息id由其邏輯位置決定,即從消息id可直接定位到消息的存儲位置,避免id到位置的額外映射。

下面文件列表是筆者在Kafka broker上做的一個實驗,創建一個topicXXX包含1 partition,設置每個segment大小為500MB,並啟動producer向Kafka broker寫入大量數據,如下圖2所示segment文件列表形象說明了上述2個規則:

以上述圖2中一對segment file文件為例,說明segment中index<—->data file對應關系物理結構如下:

上述圖3中索引文件存儲大量元數據,數據文件存儲大量消息,索引文件中元數據指向對應數據文件中message的物理偏移地址。其中以索引文件中 元數據3,497為例,依次在數據文件中表示第3個message(在全局partiton表示第368772個message)、以及該消息的物理偏移 地址為497。

從上述圖3了解到segment data file由許多message組成,下面詳細說明message物理結構如下:

參數說明:

關鍵字 解釋說明
8 byte offset 在parition(分區)內的每條消息都有一個有序的id號,這個id號被稱為偏移(offset),它可以唯一確定每條消息在parition(分區)內的位置。即offset表示partiion的第多少message
4 byte message size message大小
4 byte CRC32 用crc32校驗message
1 byte “magic" 表示本次發布Kafka服務程序協議版本號
1 byte “attributes" 表示為獨立版本、或標識壓縮類型、或編碼類型。
4 byte key length 表示key的長度,當key為-1時,K byte key字段不填
K byte key 可選
value bytes payload 表示實際消息數據。
 
2.6 在partition中如何通過offset查找message

例如讀取offset=368776的message,需要通過下面2個步驟查找。

  • 第一步查找segment file

    上述圖2為例,其中00000000000000000000.index表示最開始的文件,起始偏移量(offset)為0.第二個文件 00000000000000368769.index的消息量起始偏移量為368770 = 368769 + 1.同樣,第三個文件00000000000000737337.index的起始偏移量為737338=737337 + 1,其他后續文件依次類推,以起始偏移量命名並排序這些文件,只要根據offset **二分查找**文件列表,就可以快速定位到具體文件。

    當offset=368776時定位到00000000000000368769.index|log

  • 第二步通過segment file查找message通過第一步定位到segment file,當offset=368776時,依次定位到00000000000000368769.index的元數據物理位置和 00000000000000368769.log的物理偏移地址,然后再通過00000000000000368769.log順序查找直到 offset=368776為止。

segment index file采取稀疏索引存儲方式,它減少索引文件大小,通過mmap可以直接內存操作,稀疏索引為數據文件的每個對應message設置一個元數據指針,它 比稠密索引節省了更多的存儲空間,但查找起來需要消耗更多的時間。
 
kafka會記錄offset到zk中。但是,zk client api對zk的頻繁寫入是一個低效的操作。0.8.2 kafka引入了native offset storage,將offset管理從zk移出,並且可以做到水平擴展。其原理就是利用了kafka的compacted topic,offset以consumer group,topic與partion的組合作為key直接提交到compacted topic中。同時Kafka又在內存中維護了的三元組來維護最新的offset信息,consumer來取最新offset信息的時候直接內存里拿即可。當然,kafka允許你快速的checkpoint最新的offset信息到磁盤上。
 
3.Partition Replication原則

Kafka高效文件存儲設計特點

  • Kafka把topic中一個parition大文件分成多個小文件段,通過多個小文件段,就容易定期清除或刪除已經消費完文件,減少磁盤占用。
  • 通過索引信息可以快速定位message和確定response的最大大小。
  • 通過index元數據全部映射到memory,可以避免segment file的IO磁盤操作。
  • 通過索引文件稀疏存儲,可以大幅降低index文件元數據占用空間大小。
 
 

1. Kafka集群partition replication默認自動分配分析

下面以一個Kafka集群中4個Broker舉例,創建1個topic包含4個Partition,2 Replication;數據Producer流動如圖所示:

(1)

 

 

(2)當集群中新增2節點,Partition增加到6個時分布情況如下:

 

副本分配邏輯規則如下:

  • 在Kafka集群中,每個Broker都有均等分配Partition的Leader機會。
  • 上述圖Broker Partition中,箭頭指向為副本,以Partition-0為例:broker1中parition-0為Leader,Broker2中Partition-0為副本。
  • 上述圖種每個Broker(按照BrokerId有序)依次分配主Partition,下一個Broker為副本,如此循環迭代分配,多副本都遵循此規則。
 
副本分配算法如下:
  • 將所有N Broker和待分配的i個Partition排序.
  • 將第i個Partition分配到第(i mod n)個Broker上.
  • 將第i個Partition的第j個副本分配到第((i + j) mod n)個Broker上.
 
4.Kafka Broker一些特性
4.1 無狀態的Kafka Broker :
1. Broker沒有副本機制,一旦broker宕機,該broker的消息將都不可用。
2. Broker不保存訂閱者的狀態,由訂閱者自己保存。
3. 無狀態導致消息的刪除成為難題(可能刪除的消息正在被訂閱),kafka采用基於時間的SLA(服務水平保證),消息保存一定時間(通常為7天)后會被刪除。
4. 消息訂閱者可以rewind back到任意位置重新進行消費,當訂閱者故障時,可以選擇最小的offset進行重新讀取消費消息。
 
4.2 message的交付與生命周期 :
1. 不是嚴格的JMS, 因此 kafka對消息的重復、丟失、錯誤以及順序型沒有嚴格的要求。(這是與AMQ最大的區別)
2. kafka提供at-least-once delivery,即當consumer宕機后,有些消息可能會被重復delivery。
3. 因每個partition只會被consumer group內的一個consumer消費,故kafka保證每個partition內的消息會被順序的訂閱。
4. Kafka為每條消息為每條消息計算CRC校驗,用於錯誤檢測,crc校驗不通過的消息會直接被丟棄掉。
 
4.3 壓縮
 
Kafka支持以集合(batch)為單位發送消息,在此基礎上,Kafka還支持對消息集合進行壓縮,Producer端可以通過GZIP或Snappy格式對消息集合進行壓縮。Producer端進行壓縮之后,在Consumer端需進行解壓。壓縮的好處就是減少傳輸的數據量,減輕對網絡傳輸的壓力,在對大數據處理上,瓶頸往往體現在網絡上而不是CPU。
 
那么如何區分消息是壓縮的還是未壓縮的呢,Kafka在消息頭部添加了一個描述壓縮屬性字節,這個字節的后兩位表示消息的壓縮采用的編碼,如果后兩位為0,則表示消息未被壓縮。
 
4.4 消息可靠性
 
在消息系統中,保證消息在生產和消費過程中的可靠性是十分重要的,在實際消息傳遞過程中,可能會出現如下三中情況:
 
- 一個消息發送失敗
 
- 一個消息被發送多次
 
- 最理想的情況:exactly-once ,一個消息發送成功且僅發送了一次
 
有許多系統聲稱它們實現了exactly-once,但是它們其實忽略了生產者或消費者在生產和消費過程中有可能失敗的情況。比如雖然一個Producer成功發送一個消息,但是消息在發送途中丟失,或者成功發送到broker,也被consumer成功取走,但是這個consumer在處理取過來的消息時失敗了。
 
從Producer端看:Kafka是這么處理的,當一個消息被發送后,Producer會等待broker成功接收到消息的反饋(可通過參數控制等待時間),如果消息在途中丟失或是其中一個broker掛掉,Producer會重新發送(我們知道Kafka有備份機制,可以通過參數控制是否等待所有備份節點都收到消息)。
 
從Consumer端看:前面講到過partition,broker端記錄了partition中的一個offset值,這個值指向Consumer下一個即將消費message。當Consumer收到了消息,但卻在處理過程中掛掉,此時Consumer可以通過這個offset值重新找到上一個消息再進行處理。Consumer還有權限控制這個offset值,對持久化到broker端的消息做任意處理。
 
4.5 備份機制
 
備份機制是Kafka0.8版本的新特性,備份機制的出現大大提高了Kafka集群的可靠性、穩定性。有了備份機制后,Kafka允許集群中的節點掛掉后而不影響整個集群工作。一個備份數量為n的集群允許n-1個節點失敗。在所有備份節點中,有一個節點作為lead節點,這個節點保存了其它備份節點列表,並維持各個備份間的狀體同步。下面這幅圖解釋了Kafka的備份機制:
 
 
4.6 Kafka高效性相關設計
 
4.6.1 消息的持久化
Kafka高度依賴文件系統來存儲和緩存消息(AMQ的nessage是持久化到mysql數據庫中的),因為一般的人認為磁盤是緩慢的,這導致人們對持久化結構具有競爭性持懷疑態度。其實,磁盤的快或者慢,這決定於我們如何使用磁盤。 因為磁盤線性寫的速度遠遠大於隨機寫。線性讀寫在大多數應用場景下是可以預測的。
4.6.2 常數時間性能保證
每個Topic的Partition的是一個大文件夾,里面有無數個小文件夾segment,但partition是一個隊列,隊列中的元素是segment,消費的時候先從第0個segment開始消費,新來message存在最后一個消息隊列中。對於segment也是對隊列,隊列元素是message,有對應的offsite標識是哪個message。消費的時候先從這個segment的第一個message開始消費,新來的message存在segment的最后。
 
消息系統的 持久化隊列可以構建在對一個文件的讀和追加上,就像一般情況下的日志解決方案。它有一個優點,所有的操作都是常數時間,並且讀寫之間不會相互阻塞。這種設計具有極大的性能優勢:最終系統性能和數據大小完全無關,服務器可以充分利用廉價的硬盤來提供高效的消息服務。
 
事實上還有一點,磁盤空間的無限增大而不影響性能這點,意味着我們可以提供一般消息系統無法提供的特性。比如說,消息被消費后不是立馬被刪除,我們可以將這些消息保留一段相對比較長的時間(比如一個星期)。
 
5.Kafka 生產者-消費者
     消息系統通常都會由生產者,消費者,Broker三大部分組成,生產者會將消息寫入到Broker,消費者會從Broker中讀取出消息,不同的MQ實現的Broker實現會有所不同,不過Broker的本質都是要負責將消息落地到服務端的存儲系統中。具體步驟如下:
  1. 生產者客戶端應用程序產生消息:

    1. 客戶端連接對象將消息包裝到請求中發送到服務端
    2. 服務端的入口也有一個連接對象負責接收請求,並將消息以文件的形式存儲起來
    3. 服務端返回響應結果給生產者客戶端
  2. 消費者客戶端應用程序消費消息:

    1. 客戶端連接對象將消費信息也包裝到請求中發送給服務端
    2. 服務端從文件存儲系統中取出消息
    3. 服務端返回響應結果給消費者客戶端
    4. 客戶端將響應結果還原成消息並開始處理消息
 
                                                                              圖4-1 客戶端和服務端交互
 
5.1  Producers
 
Producers直接發送消息到broker上的leader partition,不需要經過任何中介或其他路由轉發。為了實現這個特性,kafka集群中的每個broker都可以響應producer的請求,並返回topic的一些元信息,這些元信息包括哪些機器是存活的,topic的leader partition都在哪,現階段哪些leader partition是可以直接被訪問的。
 
Producer客戶端自己控制着消息被推送到哪些partition。實現的方式可以是隨機分配、實現一類隨機負載均衡算法,或者指定一些分區算法。Kafka提供了接口供用戶實現自定義的partition,用戶可以為每個消息指定一個partitionKey,通過這個key來實現一些hash分區算法。比如,把userid作為partitionkey的話,相同userid的消息將會被推送到同一個partition。
 
以Batch的方式推送數據可以極大的提高處理效率,kafka Producer 可以將消息在內存中累計到一定數量后作為一個batch發送請求。Batch的數量大小可以通過Producer的參數控制,參數值可以設置為累計的消息的數量(如500條)、累計的時間間隔(如100ms)或者累計的數據大小(64KB)。通過增加batch的大小,可以減少網絡請求和磁盤IO的次數,當然具體參數設置需要在效率和時效性方面做一個權衡。
 
Producers可以異步的並行的向kafka發送消息,但是通常producer在發送完消息之后會得到一個future響應,返回的是offset值或者發送過程中遇到的錯誤。這其中有個非常重要的參數“acks”,這個參數決定了producer要求leader partition 收到確認的副本個數,如果acks設置數量為0,表示producer不會等待broker的響應,所以,producer無法知道消息是否發送成功,這樣有可能會導致數據丟失,但同時,acks值為0會得到最大的系統吞吐量。
 
若acks設置為1,表示producer會在leader partition收到消息時得到broker的一個確認,這樣會有更好的可靠性,因為客戶端會等待直到broker確認收到消息。若設置為-1,producer會在所有備份的partition收到消息時得到broker的確認,這個設置可以得到最高的可靠性保證。
 
Kafka 消息有一個定長的header和變長的字節數組組成。因為kafka消息支持字節數組,也就使得kafka可以支持任何用戶自定義的序列號格式或者其它已有的格式如Apache Avro、protobuf等。Kafka沒有限定單個消息的大小,但我們推薦消息大小不要超過1MB,通常一般消息大小都在1~10kB之前。
 
發布消息時,kafka client先構造一條消息,將消息加入到消息集set中(kafka支持批量發布,可以往消息集合中添加多條消息,一次行發布),send消息時,producer client需指定消息所屬的topic。
 
5.2  Consumers
Kafka提供了兩套consumer api,分為high-level api和sample-api。Sample-api 是一個底層的API,它維持了一個和單一broker的連接,並且這個API是完全無狀態的,每次請求都需要指定offset值,因此,這套API也是最靈活的。
 
在kafka中,當前讀到哪條消息的offset值是由consumer來維護的,因此,consumer可以自己決定如何讀取kafka中的數據。比如,consumer可以通過重設offset值來重新消費已消費過的數據。不管有沒有被消費,kafka會保存數據一段時間,這個時間周期是可配置的,只有到了過期時間,kafka才會刪除這些數據。(這一點與AMQ不一樣,AMQ的message一般來說都是持久化到mysql中的,消費完的message會被delete掉)
 
High-level API封裝了對集群中一系列broker的訪問,可以透明的消費一個topic。它自己維持了已消費消息的狀態,即每次消費的都是下一個消息。
 
High-level API還支持以組的形式消費topic,如果consumers有同一個組名,那么kafka就相當於一個隊列消息服務,而各個consumer均衡的消費相應partition中的數據。若consumers有不同的組名,那么此時kafka就相當與一個廣播服務,會把topic中的所有消息廣播到每個consumer。
 
High level api和Low level api是針對consumer而言的,和producer無關。
 
High level api是consumer讀的partition的offsite是存在zookeeper上。High level api 會啟動另外一個線程去每隔一段時間,offsite自動同步到zookeeper上。換句話說,如果使用了High level api, 每個message只能被讀一次,一旦讀了這條message之后,無論我consumer的處理是否ok。High level api的另外一個線程會自動的把offiste+1同步到zookeeper上。如果consumer讀取數據出了問題,offsite也會在zookeeper上同步。因此,如果consumer處理失敗了,會繼續執行下一條。這往往是不對的行為。因此,Best Practice是一旦consumer處理失敗,直接讓整個conusmer group拋Exception終止,但是最后讀的這一條數據是丟失了,因為在zookeeper里面的offsite已經+1了。等再次啟動conusmer group的時候,已經從下一條開始讀取處理了。
 
Low level api 是consumer讀的partition的offsite在consumer自己的程序中維護。不會同步到zookeeper上。但是為了kafka manager能夠方便的監控,一般也會手動的同步到zookeeper上。這樣的好處是一旦讀取某個message的consumer失敗了,這條message的offsite我們自己維護,我們不會+1。下次再啟動的時候,還會從這個offsite開始讀。這樣可以做到exactly once對於數據的准確性有保證。
 
 
對於Consumer group:
1. 允許consumer group(包含多個consumer,如一個集群同時消費)對一個topic進行消費,不同的consumer group之間獨立消費。
2. 為了對減小一個consumer group中不同consumer之間的分布式協調開銷,指定partition為最小的並行消費單位,即一個group內的consumer只能消費不同的partition。
 
 
Consumer與Partition的關系:
- 如果consumer比partition多,是浪費,因為kafka的設計是在一個partition上是不允許並發的,所以consumer數不要大於partition數
- 如果consumer比partition少,一個consumer會對應於多個partitions,這里主要合理分配consumer數和partition數,否則會導致partition里面的數據被取的不均勻
- 如果consumer從多個partition讀到數據,不保證數據間的順序性,kafka只保證在一個partition上數據是有序的,但多個partition,根據你讀的順序會有不同
- 增減consumer,broker,partition會導致rebalance,所以rebalance后consumer對應的partition會發生變化
- High-level接口中獲取不到數據的時候是會block的
 
負載低的情況下可以每個線程消費多個partition。但負載高的情況下,Consumer 線程數最好和Partition數量保持一致。如果還是消費不過來,應該再開 Consumer 進程,進程內線程數同樣和分區數一致。
 
消費消息時,kafka client需指定topic以及partition number(每個partition對應一個邏輯日志流,如topic代表某個產品線,partition代表產品線的日志按天切分的結果),consumer client訂閱后,就可迭代讀取消息,如果沒有消息,consumer client會阻塞直到有新的消息發布。consumer可以累積確認接收到的消息,當其確認了某個offset的消息,意味着之前的消息也都已成功接收到,此時broker會更新zookeeper上地offset registry。
 
5.3 高效的數據傳輸
1.  發布者每次可發布多條消息(將消息加到一個消息集合中發布), consumer每次迭代消費一條消息。

2.  不創建單獨的cache,使用系統的page cache。發布者順序發布,訂閱者通常比發布者滯后一點點,直接使用linux的page cache效果也比較后,同時減少了cache管理及垃圾收集的開銷。

3.  使用sendfile優化網絡傳輸,減少一次內存拷貝。
 
6.Kafka 與 Zookeeper
 
6.1 Zookeeper 協調控制
1. 管理broker與consumer的動態加入與離開。(Producer不需要管理,隨便一台計算機都可以作為Producer向Kakfa Broker發消息)
2. 觸發負載均衡,當broker或consumer加入或離開時會觸發負載均衡算法,使得一
   個 consumer group內的多個consumer的消費負載平衡。(因為一個comsumer消費一個或多個partition,一個partition只能被一個consumer消費)

3.  維護消費關系及每個partition的消費信息。

 

6.2 Zookeeper上的細節:

1. 每個broker啟動后會在zookeeper上注冊一個臨時的broker registry,包含broker的ip地址和端口號,所存儲的topics和partitions信息。

2. 每個consumer啟動后會在zookeeper上注冊一個臨時的consumer registry:包含consumer所屬的consumer group以及訂閱的topics。

3. 每個consumer group關聯一個臨時的owner registry和一個持久的offset registry。對於被訂閱的每個partition包含一個owner registry,內容為訂閱這個partition的consumer id;同時包含一個offset registry,內容為上一次訂閱的offset。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM