一、什么是kafka
Kafka是最初由Linkedin公司開發,是一個分布式、支持分區的(partition)、多副本的(replica),基於zookeeper協調的分布式消息系統,它的最大的特性就是可以實時的處理大量數據以滿足各種需求場景:比如基於hadoop的批處理系統、低延遲的實時系統、storm/Spark流式處理引擎,web/nginx日志、訪問日志,消息服務等等,用scala語言編寫,Linkedin於2010年貢獻給了Apache基金會並成為頂級開源項目。
二、kafka與其他消息中間件
Redis |
|
RabbitMQ |
|
ZeroMQ |
|
ActiveMQ |
|
Kafka/Jafka |
|
三、kafka解決了什么問題
Kafka主要用途是數據集成,或者說是流數據集成,以Pub/Sub形式的消息總線形式提供。但是,Kafka不僅僅是一套傳統的消息總線,本質上Kafka是分布式的流數據平台,因為以下特性而著名:
- 提供Pub/Sub方式的海量消息處理。
- 以高容錯的方式存儲海量數據流。
- 保證數據流的順序。
常用場景:
四、kafka基本概念
Message(消息):傳遞的數據對象,主要由四部分構成:offset(偏移量)、key、value、timestamp(插入時間); 其中offset和timestamp在kafka集群中產生,key/value在producer發送數據的時候產生Broker(代理者):Kafka集群中的機器/服務被成為broker, 是一個物理概念。
Topic(主題):維護Kafka上的消息類型被稱為Topic,是一個邏輯概念。
Partition(分區):具體維護Kafka上的消息數據的最小單位,一個Topic可以包含多個分區;Partition特性:
ordered & immutable。(在數據的產生和消費過程中,不需要關注數據具體存儲的Partition在那個Broker上,只需要指定Topic即可,由Kafka負責將數據和對應的Partition關聯上)
Producer(生產者):負責將數據發送到Kafka對應Topic的進程
Consumergroup:各個consumer(consumer 線程)可以組成一個組(Consumer group ),partition中的每個message只能被組(Consumer group )中的一個consumer(consumer 線程)消費,如果一個message可以被多個consumer(consumer 線程)消費的話,那么這些consumer必須在不同的組。Kafka不支持一個partition中的message由兩個或兩個以上的同一個consumer group下的consumer thread來處理,除非再啟動一個新的consumer group。所以如果想同時對一個topic做消費的話,啟動多個consumer group就可以了,但是要注意的是,這里的多個consumer的消費都必須是順序讀取partition里面的message,新啟動的consumer默認從partition隊列最頭端最新的地方開始阻塞的讀message。它不能像AMQ那樣可以多個BET作為consumer去互斥的(for update悲觀鎖)並發處理message,這是因為多個BET去消費一個Queue中的數據的時候,由於要保證不能多個線程拿同一條message,所以就需要行級別悲觀所(for update),這就導致了consume的性能下降,吞吐量不夠。而kafka為了保證吞吐量,只允許同一個consumer group下的一個consumer線程去訪問一個partition。如果覺得效率不高的時候,可以加partition的數量來橫向擴展,那么再加新的consumer thread去消費。如果想多個不同的業務都需要這個topic的數據,起多個consumer group就好了,大家都是順序的讀取message,offsite的值互不影響。這樣沒有鎖競爭,充分發揮了橫向的擴展性,吞吐量極高。這也就形成了分布式消費的概念。
當啟動一個consumer group去消費一個topic的時候,無論topic里面有多個少個partition,無論我們consumer group里面配置了多少個consumer thread,這個consumer group下面的所有consumer thread一定會消費全部的partition;即便這個consumer group下只有一個consumer thread,那么這個consumer thread也會去消費所有的partition。因此,最優的設計就是,consumer group下的consumer thread的數量等於partition數量,這樣效率是最高的。
同一partition的一條message只能被同一個Consumer Group內的一個Consumer消費。不能夠一個consumer group的多個consumer同時消費一個partition。
一個consumer group下,無論有多少個consumer,這個consumer group一定回去把這個topic下所有的partition都消費了。當consumer group里面的consumer數量小於這個topic下的partition數量的時候,如下圖groupA,groupB,就會出現一個conusmer thread消費多個partition的情況,總之是這個topic下的partition都會被消費。如果consumer group里面的consumer數量等於這個topic下的partition數量的時候,如下圖groupC,此時效率是最高的,每個partition都有一個consumer thread去消費。當consumer group里面的consumer數量大於這個topic下的partition數量的時候,如下圖GroupD,就會有一個consumer thread空閑。因此,我們在設定consumer group的時候,只需要指明里面有幾個consumer數量即可,無需指定對應的消費partition序號,consumer會自動進行rebalance。
多個Consumer Group下的consumer可以消費同一條message,但是這種消費也是以o(1)的方式順序的讀取message去消費,,所以一定會重復消費這批message的,不能向AMQ那樣多個BET作為consumer消費(對message加鎖,消費的時候不能重復消費message)
Consumer: Consumer處理partition里面的message的時候是o(1)順序讀取的。所以必須維護着上一次讀到哪里的offsite信息。high level API,offset存於Zookeeper中,low level API的offset由自己維護。一般來說都是使用high level api的。Consumer的delivery gurarantee,默認是讀完message先commmit再處理message,autocommit默認是true,這時候先commit就會更新offsite+1,一旦處理失敗,offsite已經+1,這個時候就會丟message;也可以配置成讀完消息處理再commit,這種情況下consumer端的響應就會比較慢的,需要等處理完才行。
一般情況下,一定是一個consumer group處理一個topic的message。Best Practice是這個consumer group里面consumer的數量等於topic里面partition的數量,這樣效率是最高的,一個consumer thread處理一個partition。如果這個consumer group里面consumer的數量小於topic里面partition的數量,就會有consumer thread同時處理多個partition(這個是kafka自動的機制,我們不用指定),但是總之這個topic里面的所有partition都會被處理到的。。如果這個consumer group里面consumer的數量大於topic里面partition的數量,多出的consumer thread就會閑着啥也不干,剩下的是一個consumer thread處理一個partition,這就造成了資源的浪費,因為一個partition不可能被兩個consumer thread去處理。所以我們線上的分布式多個service服務,每個service里面的kafka consumer數量都小於對應的topic的partition數量,但是所有服務的consumer數量只和等於partition的數量,這是因為分布式service服務的所有consumer都來自一個consumer group,如果來自不同的consumer group就會處理重復的message了(同一個consumer group下的consumer不能處理同一個partition,不同的consumer group可以處理同一個topic,那么都是順序處理message,一定會處理重復的。一般這種情況都是兩個不同的業務邏輯,才會啟動兩個consumer group來處理一個topic)。
五、消息如何生產消費
官網的圖解可以直觀看出消費概覽
需要注意如下幾點:
1)一組(類)消息通常由某個topic來歸類,我們可以把這組消息“分發”給若干個分區(partition),每個分區的消息各不相同;
2)每個分區都維護着他自己的偏移量(Offset),記錄着該分區的消息此時被消費的位置;
3)一個消費線程可以對應若干個分區,但一個分區只能被具體某一個消費線程消費;
4)group.id用於標記某一個消費組,每一個消費組都會被記錄他在某一個分區的Offset,即不同consumer group針對同一個分區,都有“各自”的偏移量。
六、消息投遞
一個消息如何算投遞成功,Kafka提供了三種模式:
- 第一種是啥都不管,發送出去就當作成功,這種情況當然不能保證消息成功投遞到broker;
- 第二種是Master-Slave模型,只有當Master和所有Slave都接收到消息時,才算投遞成功,這種模型提供了最高的投遞可靠性,但是損傷了性能;
- 第三種模型,即只要Master確認收到消息就算投遞成功;實際使用時,根據應用特性選擇,絕大多數情況下都會中和可靠性和性能選擇第三種模型
消息在broker上的可靠性,因為消息會持久化到磁盤上,所以如果正常stop一個broker,其上的數據不會丟失;但是如果不正常stop,可能會使存在頁面緩存來不及寫入磁盤的消息丟失,這可以通過配置flush頁面緩存的周期、閾值緩解,但是同樣會頻繁的寫磁盤會影響性能,又是一個選擇題,根據實際情況配置。
消息消費的可靠性,Kafka提供的是“At least once”模型,因為消息的讀取進度由offset提供,offset可以由消費者自己維護也可以維護在zookeeper里,但是當消息消費后consumer掛掉,offset沒有即時寫回,就有可能發生重復讀的情況,這種情況同樣可以通過調整commit offset周期、閾值緩解,甚至消費者自己把消費和commit offset做成一個事務解決,但是如果你的應用不在乎重復消費,那就干脆不要解決,以換取最大的性能。
- Partition ack:當ack=1,表示producer寫partition leader成功后,broker就返回成功,無論其他的partition follower是否寫成功。當ack=2,表示producer寫partition leader和其他一個follower成功的時候,broker就返回成功,無論其他的partition follower是否寫成功。當ack=-1[parition的數量]的時候,表示只有producer全部寫成功的時候,才算成功,kafka broker才返回成功信息。這里需要注意的是,如果ack=1的時候,一旦有個broker宕機導致partition的follower和leader切換,會導致丟數據。
七、副本
分析過程分為以下4個步驟:
- topic中partition存儲分布
- partiton中文件存儲方式 (partition在linux服務器上就是一個目錄(文件夾))
- partiton中segment文件存儲結構
- 在partition中如何通過offset查找message
通過上述4過程詳細分析,我們就可以清楚認識到kafka文件存儲機制的奧秘。
八、zookeeper
kafka leader
Kakfa Broker集群受Zookeeper管理。所有的Kafka Broker節點一起去Zookeeper上注冊一個臨時節點,因為只有一個Kafka Broker會注冊成功,其他的都會失敗,所以這個成功在Zookeeper上注冊臨時節點的這個Kafka Broker會成為Kafka Broker Controller,其他的Kafka broker叫Kafka Broker follower。(這個過程叫Controller在ZooKeeper注冊Watch)。這個Controller會監聽其他的Kafka Broker的所有信息,如果這個kafka broker controller宕機了,在zookeeper上面的那個臨時節點就會消失,此時所有的kafka broker又會一起去Zookeeper上注冊一個臨時節點,因為只有一個Kafka Broker會注冊成功,其他的都會失敗,所以這個成功在Zookeeper上注冊臨時節點的這個Kafka Broker會成為Kafka Broker Controller,其他的Kafka broker叫Kafka Broker follower。例如:一旦有一個broker宕機了,這個kafka broker controller會讀取該宕機broker上所有的partition在zookeeper上的狀態,並選取ISR列表中的一個replica作為partition leader(如果ISR列表中的replica全掛,選一個幸存的replica作為leader; 如果該partition的所有的replica都宕機了,則將新的leader設置為-1,等待恢復,等待ISR中的任一個Replica“活”過來,並且選它作為Leader;或選擇第一個“活”過來的Replica(不一定是ISR中的)作為Leader),這個broker宕機的事情,kafka controller也會通知zookeeper,zookeeper就會通知其他的kafka broker。
Kafka的核心是日志文件,日志文件在集群中的同步是分布式數據系統最基礎的要素。
如果leaders永遠不會down的話我們就不需要followers了!一旦leader down掉了,需要在followers中選擇一個新的leader.但是followers本身有可能延時太久或者crash,所以必須選擇高質量的follower作為leader.必須保證,一旦一個消息被提交了,但是leader down掉了,新選出的leader必須可以提供這條消息。大部分的分布式系統采用了多數投票法則選擇新的leader,對於多數投票法則,就是根據所有副本節點的狀況動態的選擇最適合的作為leader.Kafka並不是使用這種方法。
Kafka動態維護了一個同步狀態的副本的集合(a set of in-sync replicas),簡稱ISR,在這個集合中的節點都是和leader保持高度一致的,任何一條消息必須被這個集合中的每個節點讀取並追加到日志中了,才回通知外部這個消息已經被提交了。因此這個集合中的任何一個節點隨時都可以被選為leader.ISR在ZooKeeper中維護。ISR中有f+1個節點,就可以允許在f個節點down掉的情況下不會丟失消息並正常提供服。ISR的成員是動態的,如果一個節點被淘汰了,當它重新達到“同步中”的狀態時,他可以重新加入ISR.這種leader的選擇方式是非常快速的,適合kafka的應用場景。
一個邪惡的想法:如果所有節點都down掉了怎么辦?Kafka對於數據不會丟失的保證,是基於至少一個節點是存活的,一旦所有節點都down了,這個就不能保證了。
實際應用中,當所有的副本都down掉時,必須及時作出反應。可以有以下兩種選擇:
1. 等待ISR中的任何一個節點恢復並擔任leader。
2. 選擇所有節點中(不只是ISR)第一個恢復的節點作為leader.
這是一個在可用性和連續性之間的權衡。如果等待ISR中的節點恢復,一旦ISR中的節點起不起來或者數據都是了,那集群就永遠恢復不了了。如果等待ISR意外的節點恢復,這個節點的數據就會被作為線上數據,有可能和真實的數據有所出入,因為有些數據它可能還沒同步到。Kafka目前選擇了第二種策略,在未來的版本中將使這個策略的選擇可配置,可以根據場景靈活的選擇。
這種窘境不只Kafka會遇到,幾乎所有的分布式數據系統都會遇到。
分布式
kafka使用zookeeper來存儲一些meta信息,並使用了zookeeper watch機制來發現meta信息的變更並作出相應的動作(比如consumer失效,觸發負載均衡等)
Broker node registry: 當一個kafka broker啟動后,首先會向zookeeper注冊自己的節點信息(臨時znode),同時當broker和zookeeper斷開連接時,此znode也會被刪除.
Broker Topic Registry: 當一個broker啟動時,會向zookeeper注冊自己持有的topic和partitions信息,仍然是一個臨時znode.
Consumer and Consumer group: 每個consumer客戶端被創建時,會向zookeeper注冊自己的信息;此作用主要是為了"負載均衡".一個group中的多個consumer可以交錯的消費一個topic的所有partitions;簡而言之,保證此topic的所有partitions都能被此group所消費,且消費時為了性能考慮,讓partition相對均衡的分散到每個consumer上.
Consumer id Registry: 每個consumer都有一個唯一的ID(host:uuid,可以通過配置文件指定,也可以由系統生成),此id用來標記消費者信息.
Consumer offset Tracking: 用來跟蹤每個consumer目前所消費的partition中最大的offset.此znode為持久節點,可以看出offset跟group_id有關,以表明當group中一個消費者失效,其他consumer可以繼續消費.
Partition Owner registry: 用來標記partition正在被哪個consumer消費.臨時znode。此節點表達了"一個partition"只能被group下一個consumer消費,同時當group下某個consumer失效,那么將會觸發負載均衡(即:讓partitions在多個consumer間均衡消費,接管那些"游離"的partitions)
當consumer啟動時,所觸發的操作:
A) 首先進行"Consumer id Registry";
B) 然后在"Consumer id Registry"節點下注冊一個watch用來監聽當前group中其他consumer的"leave"和"join";只要此znode path下節點列表變更,都會觸發此group下consumer的負載均衡.(比如一個consumer失效,那么其他consumer接管partitions).
C) 在"Broker id registry"節點下,注冊一個watch用來監聽broker的存活情況;如果broker列表變更,將會觸發所有的groups下的consumer重新balance.
總結:
1) Producer端使用zookeeper用來"發現"broker列表,以及和Topic下每個partition leader建立socket連接並發送消息.
2) Broker端使用zookeeper用來注冊broker信息,已經監測partition leader存活性.
3) Consumer端使用zookeeper用來注冊consumer信息,其中包括consumer消費的partition列表等,同時也用來發現broker列表,並和partition leader建立socket連接,並獲取消息。
協調機制
1. 管理broker與consumer的動態加入與離開。(Producer不需要管理,隨便一台計算機都可以作為Producer向Kakfa Broker發消息)
3. 維護消費關系及每個partition的消費信息。
九、開發相關
Producers
Producers直接發送消息到broker上的leader partition,不需要經過任何中介或其他路由轉發。為了實現這個特性,kafka集群中的每個broker都可以響應producer的請求,並返回topic的一些元信息,這些元信息包括哪些機器是存活的,topic的leader partition都在哪,現階段哪些leader partition是可以直接被訪問的。
Producer客戶端自己控制着消息被推送到哪些partition。實現的方式可以是隨機分配、實現一類隨機負載均衡算法,或者指定一些分區算法。Kafka提供了接口供用戶實現自定義的partition,用戶可以為每個消息指定一個partitionKey,通過這個key來實現一些hash分區算法。比如,把userid作為partitionkey的話,相同userid的消息將會被推送到同一個partition。
以Batch的方式推送數據可以極大的提高處理效率,kafka Producer 可以將消息在內存中累計到一定數量后作為一個batch發送請求。Batch的數量大小可以通過Producer的參數控制,參數值可以設置為累計的消息的數量(如500條)、累計的時間間隔(如100ms)或者累計的數據大小(64KB)。通過增加batch的大小,可以減少網絡請求和磁盤IO的次數,當然具體參數設置需要在效率和時效性方面做一個權衡。
Producers可以異步的並行的向kafka發送消息,但是通常producer在發送完消息之后會得到一個future響應,返回的是offset值或者發送過程中遇到的錯誤。這其中有個非常重要的參數“acks”,這個參數決定了producer要求leader partition 收到確認的副本個數,如果acks設置數量為0,表示producer不會等待broker的響應,所以,producer無法知道消息是否發送成功,這樣有可能會導致數據丟失,但同時,acks值為0會得到最大的系統吞吐量。
若acks設置為1,表示producer會在leader partition收到消息時得到broker的一個確認,這樣會有更好的可靠性,因為客戶端會等待直到broker確認收到消息。若設置為-1,producer會在所有備份的partition收到消息時得到broker的確認,這個設置可以得到最高的可靠性保證。
Kafka 消息有一個定長的header和變長的字節數組組成。因為kafka消息支持字節數組,也就使得kafka可以支持任何用戶自定義的序列號格式或者其它已有的格式如Apache Avro、protobuf等。Kafka沒有限定單個消息的大小,但我們推薦消息大小不要超過1MB,通常一般消息大小都在1~10kB之前。
發布消息時,kafka client先構造一條消息,將消息加入到消息集set中(kafka支持批量發布,可以往消息集合中添加多條消息,一次行發布),send消息時,producer client需指定消息所屬的topic。
Consumers
Kafka提供了兩套consumer api,分為high-level api和sample-api。Sample-api 是一個底層的API,它維持了一個和單一broker的連接,並且這個API是完全無狀態的,每次請求都需要指定offset值,因此,這套API也是最靈活的。
在kafka中,當前讀到哪條消息的offset值是由consumer來維護的,因此,consumer可以自己決定如何讀取kafka中的數據。比如,consumer可以通過重設offset值來重新消費已消費過的數據。不管有沒有被消費,kafka會保存數據一段時間,這個時間周期是可配置的,只有到了過期時間,kafka才會刪除這些數據。(這一點與AMQ不一樣,AMQ的message一般來說都是持久化到mysql中的,消費完的message會被delete掉)
High-level API封裝了對集群中一系列broker的訪問,可以透明的消費一個topic。它自己維持了已消費消息的狀態,即每次消費的都是下一個消息。
High-level API還支持以組的形式消費topic,如果consumers有同一個組名,那么kafka就相當於一個隊列消息服務,而各個consumer均衡的消費相應partition中的數據。若consumers有不同的組名,那么此時kafka就相當與一個廣播服務,會把topic中的所有消息廣播到每個consumer。
High level api和Low level api是針對consumer而言的,和producer無關。
High level api是consumer讀的partition的offsite是存在zookeeper上。High level api 會啟動另外一個線程去每隔一段時間,offsite自動同步到zookeeper上。換句話說,如果使用了High level api, 每個message只能被讀一次,一旦讀了這條message之后,無論我consumer的處理是否ok。High level api的另外一個線程會自動的把offiste+1同步到zookeeper上。如果consumer讀取數據出了問題,offsite也會在zookeeper上同步。因此,如果consumer處理失敗了,會繼續執行下一條。這往往是不對的行為。因此,Best Practice是一旦consumer處理失敗,直接讓整個conusmer group拋Exception終止,但是最后讀的這一條數據是丟失了,因為在zookeeper里面的offsite已經+1了。等再次啟動conusmer group的時候,已經從下一條開始讀取處理了。
Low level api是consumer讀的partition的offsite在consumer自己的程序中維護。不會同步到zookeeper上。但是為了kafka manager能夠方便的監控,一般也會手動的同步到zookeeper上。這樣的好處是一旦讀取某個message的consumer失敗了,這條message的offsite我們自己維護,我們不會+1。下次再啟動的時候,還會從這個offsite開始讀。這樣可以做到exactly once對於數據的准確性有保證。
借鑒:http://blog.csdn.net/ychenfeng/article/details/74980531