kafka生產消費原理筆記


一、什么是kafka

  Kafka是最初由Linkedin公司開發,是一個分布式、支持分區的(partition)、多副本的(replica),基於zookeeper協調的分布式消息系統,它的最大的特性就是可以實時的處理大量數據以滿足各種需求場景:比如基於hadoop的批處理系統、低延遲的實時系統、storm/Spark流式處理引擎,web/nginx日志、訪問日志,消息服務等等,用scala語言編寫,Linkedin於2010年貢獻給了Apache基金會並成為頂級開源項目。

二、kafka與其他消息中間件

Redis
  • 基於Key-Value對的NoSQL數據庫
  • 入隊時,當數據比較小時Redis的性能要高於RabbitMQ,而如果數據大小超過了10K,Redis則慢的無法忍受;
  • 出隊時,無論數據大小,Redis都表現出非常好的性能,而RabbitMQ的出隊性能則遠低於Redis。
RabbitMQ
  • Erlang編寫
  • 支持很多的協議:AMQP,XMPP, SMTP, STOMP
  • 非常重量級,更適合於企業級的開發
  • 發送給客戶端時先在中心隊列排隊。對路由,負載均衡或者數據持久化都有很好的支持。
ZeroMQ
  • 號稱最快的消息隊列系統,尤其針對大吞吐量的需求場景。
  • 高級/復雜的隊列,但是開發人員需要自己組合多種技術框架,技術上的復雜度是對這MQ能夠應用成功的挑戰。
  • 具有一個獨特的非中間件的模式,不需要安裝和運行一個消息服務器或中間件
  • ZeroMQ僅提供非持久性的隊列,也就是說如果宕機,數據將會丟失。
ActiveMQ
  • 類似於ZeroMQ,它能夠以代理人和點對點的技術實現隊列。
  • 類似於RabbitMQ,它少量代碼就可以高效地實現高級應用場景。
Kafka/Jafka
  • 高性能跨語言分布式發布/訂閱消息隊列系統
  • 快速持久化,可以在O(1)的系統開銷下進行消息持久化;
  • 高吞吐,在一台普通的服務器上既可以達到10W/s的吞吐速率;
  • 完全的分布式系統,Broker、Producer、Consumer都原生自動支持分布式,自動實現負載均衡;
  • 支持Hadoop數據並行加載,對於像Hadoop的一樣的日志數據和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。Kafka通過Hadoop的並行加載機制統一了在線和離線的消息處理。
  • 一個非常輕量級的消息系統,除了性能非常好之外,還是一個工作良好的分布式系統。

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

三、kafka解決了什么問題

Kafka主要用途是數據集成,或者說是流數據集成,以Pub/Sub形式的消息總線形式提供。但是,Kafka不僅僅是一套傳統的消息總線,本質上Kafka是分布式的流數據平台,因為以下特性而著名:

  1. 提供Pub/Sub方式的海量消息處理。
  2. 以高容錯的方式存儲海量數據流。
  3. 保證數據流的順序。

常用場景:

  - 日志收集:一個公司可以用Kafka可以收集各種服務的log,通過kafka以統一接口服務的方式開放給各種consumer,例如hadoop、Hbase、Solr等。
  - 消息系統:解耦和生產者和消費者、緩存消息等。
  - 用戶活動跟蹤:Kafka經常被用來記錄web用戶或者app用戶的各種活動,如瀏覽網頁、搜索、點擊等活動,這些活動信息被各個服務器發布到kafka的topic中,然后訂閱者通過訂閱這些topic
來做實時的監控分析,或者裝載到hadoop、數據倉庫中做離線分析和挖掘。
  - 運營指標:Kafka也經常用來記錄運營監控數據。包括收集各種分布式應用的數據,生產各種操作的集中反饋,比如報警和報告。
  - 流式處理:比如spark streaming和storm
  - 事件源

四、kafka基本概念

Message(消息):傳遞的數據對象,主要由四部分構成:offset(偏移量)、key、value、timestamp(插入時間); 其中offset和timestamp在kafka集群中產生,key/value在producer發送數據的時候產生Broker(代理者):Kafka集群中的機器/服務被成為broker, 是一個物理概念。

Topic(主題):維護Kafka上的消息類型被稱為Topic,是一個邏輯概念。

Partition(分區):具體維護Kafka上的消息數據的最小單位,一個Topic可以包含多個分區;Partition特性:

ordered & immutable。(在數據的產生和消費過程中,不需要關注數據具體存儲的Partition在那個Broker上,只需要指定Topic即可,由Kafka負責將數據和對應的Partition關聯上)

Producer(生產者):負責將數據發送到Kafka對應Topic的進程

Consumergroup:各個consumer(consumer 線程)可以組成一個組(Consumer group ),partition中的每個message只能被組(Consumer group )中的一個consumer(consumer 線程)消費,如果一個message可以被多個consumer(consumer 線程)消費的話,那么這些consumer必須在不同的組。Kafka不支持一個partition中的message由兩個或兩個以上的同一個consumer group下的consumer thread來處理,除非再啟動一個新的consumer group。所以如果想同時對一個topic做消費的話,啟動多個consumer group就可以了,但是要注意的是,這里的多個consumer的消費都必須是順序讀取partition里面的message,新啟動的consumer默認從partition隊列最頭端最新的地方開始阻塞的讀message。它不能像AMQ那樣可以多個BET作為consumer去互斥的(for update悲觀鎖)並發處理message,這是因為多個BET去消費一個Queue中的數據的時候,由於要保證不能多個線程拿同一條message,所以就需要行級別悲觀所(for update),這就導致了consume的性能下降,吞吐量不夠。而kafka為了保證吞吐量,只允許同一個consumer group下的一個consumer線程去訪問一個partition。如果覺得效率不高的時候,可以加partition的數量來橫向擴展,那么再加新的consumer thread去消費。如果想多個不同的業務都需要這個topic的數據,起多個consumer group就好了,大家都是順序的讀取message,offsite的值互不影響。這樣沒有鎖競爭,充分發揮了橫向的擴展性,吞吐量極高。這也就形成了分布式消費的概念。

    當啟動一個consumer group去消費一個topic的時候,無論topic里面有多個少個partition,無論我們consumer group里面配置了多少個consumer thread,這個consumer group下面的所有consumer thread一定會消費全部的partition;即便這個consumer group下只有一個consumer thread,那么這個consumer thread也會去消費所有的partition。因此,最優的設計就是,consumer group下的consumer thread的數量等於partition數量,這樣效率是最高的。

    同一partition的一條message只能被同一個Consumer Group內的一個Consumer消費。不能夠一個consumer group的多個consumer同時消費一個partition。

    一個consumer group下,無論有多少個consumer,這個consumer group一定回去把這個topic下所有的partition都消費了。當consumer group里面的consumer數量小於這個topic下的partition數量的時候,如下圖groupA,groupB,就會出現一個conusmer thread消費多個partition的情況,總之是這個topic下的partition都會被消費。如果consumer group里面的consumer數量等於這個topic下的partition數量的時候,如下圖groupC,此時效率是最高的,每個partition都有一個consumer thread去消費。當consumer group里面的consumer數量大於這個topic下的partition數量的時候,如下圖GroupD,就會有一個consumer thread空閑。因此,我們在設定consumer group的時候,只需要指明里面有幾個consumer數量即可,無需指定對應的消費partition序號,consumer會自動進行rebalance。

    多個Consumer Group下的consumer可以消費同一條message,但是這種消費也是以o(1)的方式順序的讀取message去消費,,所以一定會重復消費這批message的,不能向AMQ那樣多個BET作為consumer消費(對message加鎖,消費的時候不能重復消費message)

Consumer Rebalance的觸發條件:(1)Consumer增加或刪除會觸發 Consumer Group的Rebalance(2)Broker的增加或者減少都會觸發 Consumer Rebalance

Consumer: Consumer處理partition里面的message的時候是o(1)順序讀取的。所以必須維護着上一次讀到哪里的offsite信息。high level API,offset存於Zookeeper中,low level API的offset由自己維護。一般來說都是使用high level api的。Consumer的delivery gurarantee,默認是讀完message先commmit再處理message,autocommit默認是true,這時候先commit就會更新offsite+1,一旦處理失敗,offsite已經+1,這個時候就會丟message;也可以配置成讀完消息處理再commit,這種情況下consumer端的響應就會比較慢的,需要等處理完才行。

一般情況下,一定是一個consumer group處理一個topic的message。Best Practice是這個consumer group里面consumer的數量等於topic里面partition的數量,這樣效率是最高的,一個consumer thread處理一個partition。如果這個consumer group里面consumer的數量小於topic里面partition的數量,就會有consumer thread同時處理多個partition(這個是kafka自動的機制,我們不用指定),但是總之這個topic里面的所有partition都會被處理到的。。如果這個consumer group里面consumer的數量大於topic里面partition的數量,多出的consumer thread就會閑着啥也不干,剩下的是一個consumer thread處理一個partition,這就造成了資源的浪費,因為一個partition不可能被兩個consumer thread去處理。所以我們線上的分布式多個service服務,每個service里面的kafka consumer數量都小於對應的topic的partition數量,但是所有服務的consumer數量只和等於partition的數量,這是因為分布式service服務的所有consumer都來自一個consumer group,如果來自不同的consumer group就會處理重復的message了(同一個consumer group下的consumer不能處理同一個partition,不同的consumer group可以處理同一個topic,那么都是順序處理message,一定會處理重復的。一般這種情況都是兩個不同的業務邏輯,才會啟動兩個consumer group來處理一個topic)。

 
如果producer的流量增大,當前的topic的parition數量=consumer數量,這時候的應對方式就是很想擴展:增加topic下的partition,同時增加這個consumer group下的consumer。

五、消息如何生產消費

官網的圖解可以直觀看出消費概覽

需要注意如下幾點:

1)一組(類)消息通常由某個topic來歸類,我們可以把這組消息“分發”給若干個分區(partition),每個分區的消息各不相同;

2)每個分區都維護着他自己的偏移量(Offset),記錄着該分區的消息此時被消費的位置;

3)一個消費線程可以對應若干個分區,但一個分區只能被具體某一個消費線程消費;

4)group.id用於標記某一個消費組,每一個消費組都會被記錄他在某一個分區的Offset,即不同consumer group針對同一個分區,都有“各自”的偏移量。

六、消息投遞

一個消息如何算投遞成功,Kafka提供了三種模式:

- 第一種是啥都不管,發送出去就當作成功,這種情況當然不能保證消息成功投遞到broker;

- 第二種是Master-Slave模型,只有當Master和所有Slave都接收到消息時,才算投遞成功,這種模型提供了最高的投遞可靠性,但是損傷了性能;

- 第三種模型,即只要Master確認收到消息就算投遞成功;實際使用時,根據應用特性選擇,絕大多數情況下都會中和可靠性和性能選擇第三種模型

  消息在broker上的可靠性,因為消息會持久化到磁盤上,所以如果正常stop一個broker,其上的數據不會丟失;但是如果不正常stop,可能會使存在頁面緩存來不及寫入磁盤的消息丟失,這可以通過配置flush頁面緩存的周期、閾值緩解,但是同樣會頻繁的寫磁盤會影響性能,又是一個選擇題,根據實際情況配置。

  消息消費的可靠性,Kafka提供的是“At least once”模型,因為消息的讀取進度由offset提供,offset可以由消費者自己維護也可以維護在zookeeper里,但是當消息消費后consumer掛掉,offset沒有即時寫回,就有可能發生重復讀的情況,這種情況同樣可以通過調整commit offset周期、閾值緩解,甚至消費者自己把消費和commit offset做成一個事務解決,但是如果你的應用不在乎重復消費,那就干脆不要解決,以換取最大的性能。

- Partition ack:當ack=1,表示producer寫partition leader成功后,broker就返回成功,無論其他的partition follower是否寫成功。當ack=2,表示producer寫partition leader和其他一個follower成功的時候,broker就返回成功,無論其他的partition follower是否寫成功。當ack=-1[parition的數量]的時候,表示只有producer全部寫成功的時候,才算成功,kafka broker才返回成功信息。這里需要注意的是,如果ack=1的時候,一旦有個broker宕機導致partition的follower和leader切換,會導致丟數據。

1.持久化
kafka使用文件存儲消息(append only log),這就直接決定kafka在性能上嚴重依賴文件系統的本身特性.且無論任何OS下,對文件系統本身的優化是非常艱難的.文件緩存/直接內存映射等是常用的手段.因為kafka是對日志文件進行append操作,因此磁盤檢索的開支是較小的;同時為了減少磁盤寫入的次數,broker會將消息暫時buffer起來,當消息的個數(或尺寸)達到一定閥值時,再flush到磁盤,這樣減少了磁盤IO調用的次數.對於kafka而言,較高性能的磁盤,將會帶來更加直接的性能提升.
 
2.性能
除磁盤IO之外,我們還需要考慮網絡IO,這直接關系到kafka的吞吐量問題.kafka並沒有提供太多高超的技巧;對於producer端,可以將消息buffer起來,當消息的條數達到一定閥值時,批量發送給broker;對於consumer端也是一樣,批量fetch多條消息.不過消息量的大小可以通過配置文件來指定.對於kafka broker端,似乎有個sendfile系統調用可以潛在的提升網絡IO的性能:將文件的數據映射到系統內存中,socket直接讀取相應的內存區域即可,而無需進程再次copy和交換(這里涉及到"磁盤IO數據"/"內核內存"/"進程內存"/"網絡緩沖區",多者之間的數據copy).
其實對於producer/consumer/broker三者而言,CPU的開支應該都不大,因此啟用消息壓縮機制是一個良好的策略;壓縮需要消耗少量的CPU資源,不過對於kafka而言,網絡IO更應該需要考慮.可以將任何在網絡上傳輸的消息都經過壓縮.kafka支持gzip/snappy等多種壓縮方式.
 
3.負載均衡
kafka集群中的任何一個broker,都可以向producer提供metadata信息,這些metadata中包含"集群中存活的servers列表"/"partitions leader列表"等信息(請參看zookeeper中的節點信息). 當producer獲取到metadata信息之后, producer將會和Topic下所有partition leader保持socket連接;消息由producer直接通過socket發送到broker,中間不會經過任何"路由層".
異步發送,將多條消息暫且在客戶端buffer起來,並將他們批量發送到broker;小數據IO太多,會拖慢整體的網絡延遲,批量延遲發送事實上提升了網絡效率;不過這也有一定的隱患,比如當producer失效時,那些尚未發送的消息將會丟失。
 
4.Topic模型
其他JMS實現,消息消費的位置是有prodiver保留,以便避免重復發送消息或者將沒有消費成功的消息重發等,同時還要控制消息的狀態.這就要求JMS broker需要太多額外的工作.在kafka中,partition中的消息只有一個consumer在消費,且不存在消息狀態的控制,也沒有復雜的消息確認機制,可見kafka broker端是相當輕量級的.當消息被consumer接收之后,consumer可以在本地保存最后消息的offset,並間歇性的向zookeeper注冊offset.由此可見,consumer客戶端也很輕量級。
kafka中consumer負責維護消息的消費記錄,而broker則不關心這些,這種設計不僅提高了consumer端的靈活性,也適度的減輕了broker端設計的復雜度;這是和眾多JMS prodiver的區別.此外,kafka中消息ACK的設計也和JMS有很大不同,kafka中的消息是批量(通常以消息的條數或者chunk的尺寸為單位)發送給consumer,當消息消費成功后,向zookeeper提交消息的offset,而不會向broker交付ACK.或許你已經意識到,這種"寬松"的設計,將會有"丟失"消息/"消息重發"的危險.

七、副本 

kafka中,replication策略是基於partition,而不是topic;kafka將每個partition數據復制到多個server上,任何一個partition有一個leader和多個follower(可以沒有);備份的個數可以通過broker配置文件來設定。leader處理所有的read-write請求,follower需要和leader保持同步.Follower就像一個"consumer",消費消息並保存在本地日志中;leader負責跟蹤所有的follower狀態,如果follower"落后"太多或者失效,leader將會把它從replicas同步列表中刪除.當所有的follower都將一條消息保存成功,此消息才被認為是"committed",那么此時consumer才能消費它,這種同步策略,就要求follower和leader之間必須具有良好的網絡環境.即使只有一個replicas實例存活,仍然可以保證消息的正常發送和接收,只要zookeeper集群存活即可.
選擇follower時需要兼顧一個問題,就是新leader server上所已經承載的partition leader的個數,如果一個server上有過多的partition leader,意味着此server將承受着更多的IO壓力.在選舉新leader,需要考慮到"負載均衡",partition leader較少的broker將會更有可能成為新的leader.
副本管理
以上僅僅以一個topic一個分區為例子進行了討論,但實際上一個Kafka將會管理成千上萬的topic分區.Kafka盡量的使所有分區均勻的分布到集群所有的節點上而不是集中在某些節點上,另外主從關系也盡量均衡這樣每個幾點都會擔任一定比例的分區的leader.
優化leader的選擇過程也是很重要的,它決定了系統發生故障時的空窗期有多久。Kafka選擇一個節點作為“controller”,當發現有節點down掉的時候它負責在游泳分區的所有節點中選擇新的leader,這使得Kafka可以批量的高效的管理所有分區節點的主從關系。如果controller down掉了,活着的節點中的一個會備切換為新的controller.
 
Leader與副本同步
對於某個分區來說,保存正分區的"broker"為該分區的"leader",保存備份分區的"broker"為該分區的"follower"。備份分區會完全復制正分區的消息,包括消息的編號等附加屬性值。為了保持正分區和備份分區的內容一致,Kafka采取的方案是在保存備份分區的"broker"上開啟一個消費者進程進行消費,從而使得正分區的內容與備份分區的內容保持一致。一般情況下,一個分區有一個“正分區”和零到多個“備份分區”。可以配置“正分區+備份分區”的總數量,關於這個配置,不同主題可以有不同的配置值。注意,生產者,消費者只與保存正分區的"leader"進行通信。
 
Kafka允許topic的分區擁有若干副本,這個數量是可以配置的,你可以為每個topic配置副本的數量。Kafka會自動在每個副本上備份數據,所以當一個節點down掉時數據依然是可用的。
Kafka的副本功能不是必須的,你可以配置只有一個副本,這樣其實就相當於只有一份數據。
創建副本的單位是topic的分區,每個分區都有一個leader和零或多個followers.所有的讀寫操作都由leader處理,一般分區的數量都比broker的數量多的多,各分區的leader均勻的分布在brokers中。所有的followers都復制leader的日志,日志中的消息和順序都和leader中的一致。followers向普通的consumer那樣從leader那里拉取消息並保存在自己的日志文件中。
許多分布式的消息系統自動的處理失敗的請求,它們對一個節點是否着(alive)”有着清晰的定義。Kafka判斷一個節點是否活着有兩個條件:
1. 節點必須可以維護和ZooKeeper的連接,Zookeeper通過心跳機制檢查每個節點的連接。
2. 如果節點是個follower,他必須能及時的同步leader的寫操作,延時不能太久。
符合以上條件的節點准確的說應該是“同步中的(in sync)”,而不是模糊的說是“活着的”或是“失敗的”。Leader會追蹤所有“同步中”的節點,一旦一個down掉了,或是卡住了,或是延時太久,leader就會把它移除。至於延時多久算是“太久”,是由參數replica.lag.max.messages決定的,怎樣算是卡住了,怎是由參數replica.lag.time.max.ms決定的。
只有當消息被所有的副本加入到日志中時,才算是“committed”,只有committed的消息才會發送給consumer,這樣就不用擔心一旦leader down掉了消息會丟失。Producer也可以選擇是否等待消息被提交的通知,這個是由參數acks決定的。
Kafka保證只要有一個“同步中”的節點,“committed”的消息就不會丟失。


       一個典型的Kafka集群中包含若干Producer(可以是web前端FET,或者是服務器日志等),若干broker(Kafka支持水平擴展,一般broker數量越多,集群吞吐率越高),若干ConsumerGroup,以及一個Zookeeper集群。Kafka通過Zookeeper管理Kafka集群配置:選舉Kafka broker的leader,以及在Consumer Group發生變化時進行rebalance,因為consumer消費kafka topic的partition的offsite信息是存在Zookeeper的。Producer使用push模式將消息發布到broker,Consumer使用pull模式從broker訂閱並消費消息。
 

分析過程分為以下4個步驟:

  • topic中partition存儲分布
  • partiton中文件存儲方式 (partition在linux服務器上就是一個目錄(文件夾))
  • partiton中segment文件存儲結構
  • 在partition中如何通過offset查找message

通過上述4過程詳細分析,我們就可以清楚認識到kafka文件存儲機制的奧秘。

八、zookeeper

kafka leader

Kakfa Broker集群受Zookeeper管理。所有的Kafka Broker節點一起去Zookeeper上注冊一個臨時節點,因為只有一個Kafka Broker會注冊成功,其他的都會失敗,所以這個成功在Zookeeper上注冊臨時節點的這個Kafka Broker會成為Kafka Broker Controller,其他的Kafka broker叫Kafka Broker follower。(這個過程叫Controller在ZooKeeper注冊Watch)。這個Controller會監聽其他的Kafka Broker的所有信息,如果這個kafka broker controller宕機了,在zookeeper上面的那個臨時節點就會消失,此時所有的kafka broker又會一起去Zookeeper上注冊一個臨時節點,因為只有一個Kafka Broker會注冊成功,其他的都會失敗,所以這個成功在Zookeeper上注冊臨時節點的這個Kafka Broker會成為Kafka Broker Controller,其他的Kafka broker叫Kafka Broker follower。例如:一旦有一個broker宕機了,這個kafka broker controller會讀取該宕機broker上所有的partition在zookeeper上的狀態,並選取ISR列表中的一個replica作為partition leader(如果ISR列表中的replica全掛,選一個幸存的replica作為leader; 如果該partition的所有的replica都宕機了,則將新的leader設置為-1,等待恢復,等待ISR中的任一個Replica“活”過來,並且選它作為Leader;或選擇第一個“活”過來的Replica(不一定是ISR中的)作為Leader),這個broker宕機的事情,kafka controller也會通知zookeeper,zookeeper就會通知其他的kafka broker。

Kafka的核心是日志文件,日志文件在集群中的同步是分布式數據系統最基礎的要素。

如果leaders永遠不會down的話我們就不需要followers了!一旦leader down掉了,需要在followers中選擇一個新的leader.但是followers本身有可能延時太久或者crash,所以必須選擇高質量的follower作為leader.必須保證,一旦一個消息被提交了,但是leader down掉了,新選出的leader必須可以提供這條消息。大部分的分布式系統采用了多數投票法則選擇新的leader,對於多數投票法則,就是根據所有副本節點的狀況動態的選擇最適合的作為leader.Kafka並不是使用這種方法。

Kafka動態維護了一個同步狀態的副本的集合(a set of in-sync replicas),簡稱ISR,在這個集合中的節點都是和leader保持高度一致的,任何一條消息必須被這個集合中的每個節點讀取並追加到日志中了,才回通知外部這個消息已經被提交了。因此這個集合中的任何一個節點隨時都可以被選為leader.ISR在ZooKeeper中維護。ISR中有f+1個節點,就可以允許在f個節點down掉的情況下不會丟失消息並正常提供服。ISR的成員是動態的,如果一個節點被淘汰了,當它重新達到“同步中”的狀態時,他可以重新加入ISR.這種leader的選擇方式是非常快速的,適合kafka的應用場景。

一個邪惡的想法:如果所有節點都down掉了怎么辦?Kafka對於數據不會丟失的保證,是基於至少一個節點是存活的,一旦所有節點都down了,這個就不能保證了。

實際應用中,當所有的副本都down掉時,必須及時作出反應。可以有以下兩種選擇:

1. 等待ISR中的任何一個節點恢復並擔任leader。

2. 選擇所有節點中(不只是ISR)第一個恢復的節點作為leader.

這是一個在可用性和連續性之間的權衡。如果等待ISR中的節點恢復,一旦ISR中的節點起不起來或者數據都是了,那集群就永遠恢復不了了。如果等待ISR意外的節點恢復,這個節點的數據就會被作為線上數據,有可能和真實的數據有所出入,因為有些數據它可能還沒同步到。Kafka目前選擇了第二種策略,在未來的版本中將使這個策略的選擇可配置,可以根據場景靈活的選擇。

這種窘境不只Kafka會遇到,幾乎所有的分布式數據系統都會遇到。

 

分布式

kafka使用zookeeper來存儲一些meta信息,並使用了zookeeper watch機制來發現meta信息的變更並作出相應的動作(比如consumer失效,觸發負載均衡等)

Broker node registry: 當一個kafka broker啟動后,首先會向zookeeper注冊自己的節點信息(臨時znode),同時當broker和zookeeper斷開連接時,此znode也會被刪除.

Broker Topic Registry: 當一個broker啟動時,會向zookeeper注冊自己持有的topic和partitions信息,仍然是一個臨時znode.

Consumer and Consumer group: 每個consumer客戶端被創建時,會向zookeeper注冊自己的信息;此作用主要是為了"負載均衡".一個group中的多個consumer可以交錯的消費一個topic的所有partitions;簡而言之,保證此topic的所有partitions都能被此group所消費,且消費時為了性能考慮,讓partition相對均衡的分散到每個consumer上.

Consumer id Registry: 每個consumer都有一個唯一的ID(host:uuid,可以通過配置文件指定,也可以由系統生成),此id用來標記消費者信息.

Consumer offset Tracking: 用來跟蹤每個consumer目前所消費的partition中最大的offset.此znode為持久節點,可以看出offset跟group_id有關,以表明當group中一個消費者失效,其他consumer可以繼續消費.

Partition Owner registry: 用來標記partition正在被哪個consumer消費.臨時znode。此節點表達了"一個partition"只能被group下一個consumer消費,同時當group下某個consumer失效,那么將會觸發負載均衡(即:讓partitions在多個consumer間均衡消費,接管那些"游離"的partitions)

當consumer啟動時,所觸發的操作:

A) 首先進行"Consumer id Registry";

B) 然后在"Consumer id Registry"節點下注冊一個watch用來監聽當前group中其他consumer的"leave"和"join";只要此znode path下節點列表變更,都會觸發此group下consumer的負載均衡.(比如一個consumer失效,那么其他consumer接管partitions).

C) 在"Broker id registry"節點下,注冊一個watch用來監聽broker的存活情況;如果broker列表變更,將會觸發所有的groups下的consumer重新balance.

 

總結:

1) Producer端使用zookeeper用來"發現"broker列表,以及和Topic下每個partition leader建立socket連接並發送消息.

2) Broker端使用zookeeper用來注冊broker信息,已經監測partition leader存活性.

3) Consumer端使用zookeeper用來注冊consumer信息,其中包括consumer消費的partition列表等,同時也用來發現broker列表,並和partition leader建立socket連接,並獲取消息。

 

協調機制

1. 管理broker與consumer的動態加入與離開。(Producer不需要管理,隨便一台計算機都可以作為Producer向Kakfa Broker發消息)

2. 觸發負載均衡,當broker或consumer加入或離開時會觸發負載均衡算法,使得一consumer group內的多個consumer的消費負載平衡。(因為一個comsumer消費一個或多個partition,一個partition只能被一個consumer消費)

3.  維護消費關系及每個partition的消費信息。

九、開發相關 

Producers

Producers直接發送消息到broker上的leader partition,不需要經過任何中介或其他路由轉發。為了實現這個特性,kafka集群中的每個broker都可以響應producer的請求,並返回topic的一些元信息,這些元信息包括哪些機器是存活的,topic的leader partition都在哪,現階段哪些leader partition是可以直接被訪問的。

Producer客戶端自己控制着消息被推送到哪些partition。實現的方式可以是隨機分配、實現一類隨機負載均衡算法,或者指定一些分區算法。Kafka提供了接口供用戶實現自定義的partition,用戶可以為每個消息指定一個partitionKey,通過這個key來實現一些hash分區算法。比如,把userid作為partitionkey的話,相同userid的消息將會被推送到同一個partition。

以Batch的方式推送數據可以極大的提高處理效率,kafka Producer 可以將消息在內存中累計到一定數量后作為一個batch發送請求。Batch的數量大小可以通過Producer的參數控制,參數值可以設置為累計的消息的數量(如500條)、累計的時間間隔(如100ms)或者累計的數據大小(64KB)。通過增加batch的大小,可以減少網絡請求和磁盤IO的次數,當然具體參數設置需要在效率和時效性方面做一個權衡。

Producers可以異步的並行的向kafka發送消息,但是通常producer在發送完消息之后會得到一個future響應,返回的是offset值或者發送過程中遇到的錯誤。這其中有個非常重要的參數“acks”,這個參數決定了producer要求leader partition 收到確認的副本個數,如果acks設置數量為0,表示producer不會等待broker的響應,所以,producer無法知道消息是否發送成功,這樣有可能會導致數據丟失,但同時,acks值為0會得到最大的系統吞吐量。

若acks設置為1,表示producer會在leader partition收到消息時得到broker的一個確認,這樣會有更好的可靠性,因為客戶端會等待直到broker確認收到消息。若設置為-1,producer會在所有備份的partition收到消息時得到broker的確認,這個設置可以得到最高的可靠性保證。

Kafka 消息有一個定長的header和變長的字節數組組成。因為kafka消息支持字節數組,也就使得kafka可以支持任何用戶自定義的序列號格式或者其它已有的格式如Apache Avro、protobuf等。Kafka沒有限定單個消息的大小,但我們推薦消息大小不要超過1MB,通常一般消息大小都在1~10kB之前。

發布消息時,kafka client先構造一條消息,將消息加入到消息集set中(kafka支持批量發布,可以往消息集合中添加多條消息,一次行發布),send消息時,producer client需指定消息所屬的topic。

 

Consumers

Kafka提供了兩套consumer api,分為high-level api和sample-api。Sample-api 是一個底層的API,它維持了一個和單一broker的連接,並且這個API是完全無狀態的,每次請求都需要指定offset值,因此,這套API也是最靈活的。 

在kafka中,當前讀到哪條消息的offset值是由consumer來維護的,因此,consumer可以自己決定如何讀取kafka中的數據。比如,consumer可以通過重設offset值來重新消費已消費過的數據。不管有沒有被消費,kafka會保存數據一段時間,這個時間周期是可配置的,只有到了過期時間,kafka才會刪除這些數據。(這一點與AMQ不一樣,AMQ的message一般來說都是持久化到mysql中的,消費完的message會被delete掉)

High-level API封裝了對集群中一系列broker的訪問,可以透明的消費一個topic。它自己維持了已消費消息的狀態,即每次消費的都是下一個消息。 

High-level API還支持以組的形式消費topic,如果consumers有同一個組名,那么kafka就相當於一個隊列消息服務,而各個consumer均衡的消費相應partition中的數據。若consumers有不同的組名,那么此時kafka就相當與一個廣播服務,會把topic中的所有消息廣播到每個consumer。 

High level api和Low level api是針對consumer而言的,和producer無關。

High level api是consumer讀的partition的offsite是存在zookeeper上。High level api 會啟動另外一個線程去每隔一段時間,offsite自動同步到zookeeper上。換句話說,如果使用了High level api, 每個message只能被讀一次,一旦讀了這條message之后,無論我consumer的處理是否ok。High level api的另外一個線程會自動的把offiste+1同步到zookeeper上。如果consumer讀取數據出了問題,offsite也會在zookeeper上同步。因此,如果consumer處理失敗了,會繼續執行下一條。這往往是不對的行為。因此,Best Practice是一旦consumer處理失敗,直接讓整個conusmer group拋Exception終止,但是最后讀的這一條數據是丟失了,因為在zookeeper里面的offsite已經+1了。等再次啟動conusmer group的時候,已經從下一條開始讀取處理了。

Low level api是consumer讀的partition的offsite在consumer自己的程序中維護。不會同步到zookeeper上。但是為了kafka manager能夠方便的監控,一般也會手動的同步到zookeeper上。這樣的好處是一旦讀取某個message的consumer失敗了,這條message的offsite我們自己維護,我們不會+1。下次再啟動的時候,還會從這個offsite開始讀。這樣可以做到exactly once對於數據的准確性有保證。

 

借鑒:http://blog.csdn.net/ychenfeng/article/details/74980531


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM