一般來說,(1)一個Topic的Partition數量大於等於Broker的數量,可以提高吞吐率。(2)同一個Partition的Replica盡量分散到不同的機器,高可用。
(1)怎樣傳送消息:producer先把message發送到partition leader,再由leader發送給其他partition follower。(如果讓producer發送給每個replica那就太慢了)
(2)在向Producer發送ACK前需要保證有多少個Replica已經收到該消息:根據ack配的個數而定
(3)怎樣處理某個Replica不工作的情況:如果這個部工作的partition replica不在ack列表中,就是producer在發送消息到partition leader上,partition leader向partition follower發送message沒有響應而已,這個不會影響整個系統,也不會有什么問題。如果這個不工作的partition replica在ack列表中的話,producer發送的message的時候會等待這個不工作的partition replca寫message成功,但是會等到time out,然后返回失敗因為某個ack列表中的partition replica沒有響應,此時kafka會自動的把這個部工作的partition replica從ack列表中移除,以后的producer發送message的時候就不會有這個ack列表下的這個部工作的partition replica了。
(4)怎樣處理Failed Replica恢復回來的情況:如果這個partition replica之前不在ack列表中,那么啟動后重新受Zookeeper管理即可,之后producer發送message的時候,partition leader會繼續發送message到這個partition follower上。如果這個partition replica之前在ack列表中,此時重啟后,需要把這個partition replica再手動加到ack列表中。(ack列表是手動添加的,出現某個部工作的partition replica的時候自動從ack列表中移除的)
- 第二種是Master-Slave模型,只有當Master和所有Slave都接收到消息時,才算投遞成功,這種模型提供了最高的投遞可靠性,但是損傷了性能;
- 第三種模型,即只要Master確認收到消息就算投遞成功;實際使用時,根據應用特性選擇,絕大多數情況下都會中和可靠性和性能選擇第三種模型
消息在broker上的可靠性,因為消息會持久化到磁盤上,所以如果正常stop一個broker,其上的數據不會丟失;但是如果不正常stop,可能會使存在頁面緩存來不及寫入磁盤的消息丟失,這可以通過配置flush頁面緩存的周期、閾值緩解,但是同樣會頻繁的寫磁盤會影響性能,又是一個選擇題,根據實際情況配置。
消息消費的可靠性,Kafka提供的是“At least once”模型,因為消息的讀取進度由offset提供,offset可以由消費者自己維護也可以維護在zookeeper里,但是當消息消費后consumer掛掉,offset沒有即時寫回,就有可能發生重復讀的情況,這種情況同樣可以通過調整commit offset周期、閾值緩解,甚至消費者自己把消費和commit offset做成一個事務解決,但是如果你的應用不在乎重復消費,那就干脆不要解決,以換取最大的性能。
-
message狀態:在Kafka中,消息的狀態被保存在consumer中,broker不會關心哪個消息被消費了被誰消費了,只記錄一個offset值(指向partition中下一個要被消費的消息位置),這就意味着如果consumer處理不好的話,broker上的一個消息可能會被消費多次。
-
message持久化:Kafka中會把消息持久化到本地文件系統中,並且保持o(1)極高的效率。我們眾所周知IO讀取是非常耗資源的性能也是最慢的,這就是為了數據庫的瓶頸經常在IO上,需要換SSD硬盤的原因。但是Kafka作為吞吐量極高的MQ,卻可以非常高效的message持久化到文件。這是因為Kafka是順序寫入o(1)的時間復雜度,速度非常快。也是高吞吐量的原因。由於message的寫入持久化是順序寫入的,因此message在被消費的時候也是按順序被消費的,保證partition的message是順序消費的。一般的機器,單機每秒100k條數據。
-
message有效期:Kafka會長久保留其中的消息,以便consumer可以多次消費,當然其中很多細節是可配置的。
-
Produer : Producer向Topic發送message,不需要指定partition,直接發送就好了。kafka通過partition ack來控制是否發送成功並把信息返回給producer,producer可以有任意多的thread,這些kafka服務器端是不care的。Producer端的delivery guarantee默認是At least once的。也可以設置Producer異步發送實現At most once。Producer可以用主鍵冪等性實現Exactly once
-
Kafka高吞吐量: Kafka的高吞吐量體現在讀寫上,分布式並發的讀和寫都非常快,寫的性能體現在以o(1)的時間復雜度進行順序寫入。讀的性能體現在以o(1)的時間復雜度進行順序讀取, 對topic進行partition分區,consume group中的consume線程可以以很高能性能進行順序讀。
- Kafka delivery guarantee(message傳送保證):(1)At most once消息可能會丟,絕對不會重復傳輸;(2)At least once 消息絕對不會丟,但是可能會重復傳輸;(3)Exactly once每條信息肯定會被傳輸一次且僅傳輸一次,這是用戶想要的。
-
批量發送:Kafka支持以消息集合為單位進行批量發送,以提高push效率。
-
push-and-pull : Kafka中的Producer和consumer采用的是push-and-pull模式,即Producer只管向broker push消息,consumer只管從broker pull消息,兩者對消息的生產和消費是異步的。
-
Kafka集群中broker之間的關系:不是主從關系,各個broker在集群中地位一樣,我們可以隨意的增加或刪除任何一個broker節點。
-
負載均衡方面: Kafka提供了一個 metadata API來管理broker之間的負載(對Kafka0.8.x而言,對於0.7.x主要靠zookeeper來實現負載均衡)。
-
同步異步:Producer采用異步push方式,極大提高Kafka系統的吞吐率(可以通過參數控制是采用同步還是異步方式)。
-
分區機制partition:Kafka的broker端支持消息分區partition,Producer可以決定把消息發到哪個partition,在一個partition 中message的順序就是Producer發送消息的順序,一個topic中可以有多個partition,具體partition的數量是可配置的。partition的概念使得kafka作為MQ可以橫向擴展,吞吐量巨大。partition可以設置replica副本,replica副本存在不同的kafka broker節點上,第一個partition是leader,其他的是follower,message先寫到partition leader上,再由partition leader push到parition follower上。所以說kafka可以水平擴展,也就是擴展partition。
-
離線數據裝載:Kafka由於對可拓展的數據持久化的支持,它也非常適合向Hadoop或者數據倉庫中進行數據裝載。
-
實時數據與離線數據:kafka既支持離線數據也支持實時數據,因為kafka的message持久化到文件,並可以設置有效期,因此可以把kafka作為一個高效的存儲來使用,可以作為離線數據供后面的分析。當然作為分布式實時消息系統,大多數情況下還是用於實時的數據處理的,但是當cosumer消費能力下降的時候可以通過message的持久化在淤積數據在kafka。
-
插件支持:現在不少活躍的社區已經開發出不少插件來拓展Kafka的功能,如用來配合Storm、Hadoop、flume相關的插件。
-
解耦: 相當於一個MQ,使得Producer和Consumer之間異步的操作,系統之間解耦
-
冗余: replica有多個副本,保證一個broker node宕機后不會影響整個服務
-
擴展性: broker節點可以水平擴展,partition也可以水平增加,partition replica也可以水平增加
-
峰值: 在訪問量劇增的情況下,kafka水平擴展, 應用仍然需要繼續發揮作用
-
可恢復性: 系統的一部分組件失效時,由於有partition的replica副本,不會影響到整個系統。
-
順序保證性:由於kafka的producer的寫message與consumer去讀message都是順序的讀寫,保證了高效的性能。
-
緩沖:由於producer那面可能業務很簡單,而后端consumer業務會很復雜並有數據庫的操作,因此肯定是producer會比consumer處理速度快,如果沒有kafka,producer直接調用consumer,那么就會造成整個系統的處理速度慢,加一層kafka作為MQ,可以起到緩沖的作用。
-
異步通信:作為MQ,Producer與Consumer異步通信
2.Kafka文件存儲機制
2.1 Kafka部分名詞解釋如下:
Kafka中發布訂閱的對象是topic。我們可以為每類數據創建一個topic,把向topic發布消息的客戶端稱作producer,從topic訂閱消息的客戶端稱作consumer。Producers和consumers可以同時從多個topic讀寫數據。一個kafka集群由一個或多個broker服務器組成,它負責持久化和備份具體的kafka消息。
- Broker:Kafka節點,一個Kafka節點就是一個broker,多個broker可以組成一個Kafka集群。
- Topic:一類消息,消息存放的目錄即主題,例如page view日志、click日志等都可以以topic的形式存在,Kafka集群能夠同時負責多個topic的分發。
- Partition:topic物理上的分組,一個topic可以分為多個partition,每個partition是一個有序的隊列
- Segment:partition物理上由多個segment組成,每個Segment存着message信息
- Producer : 生產message發送到topic
- Consumer : 訂閱topic消費message, consumer作為一個線程來消費
- Consumer Group:一個Consumer Group包含多個consumer, 這個是預先在配置文件中配置好的。各個consumer(consumer 線程)可以組成一個組(Consumer group ),partition中的每個message只能被組(Consumer group ) 中的一個consumer(consumer 線程 )消費,如果一個message可以被多個consumer(consumer 線程 ) 消費的話,那么這些consumer必須在不同的組。Kafka不支持一個partition中的message由兩個或兩個以上的consumer thread來處理,即便是來自不同的consumer group的也不行。它不能像AMQ那樣可以多個BET作為consumer去處理message,這是因為多個BET去消費一個Queue中的數據的時候,由於要保證不能多個線程拿同一條message,所以就需要行級別悲觀所(for update),這就導致了consume的性能下降,吞吐量不夠。而kafka為了保證吞吐量,只允許一個consumer線程去訪問一個partition。如果覺得效率不高的時候,可以加partition的數量來橫向擴展,那么再加新的consumer thread去消費。這樣沒有鎖競爭,充分發揮了橫向的擴展性,吞吐量極高。這也就形成了分布式消費的概念。
- 2.2 kafka一些原理概念
1.持久化
kafka使用文件存儲消息(append only log),這就直接決定kafka在性能上嚴重依賴文件系統的本身特性.且無論任何OS下,對文件系統本身的優化是非常艱難的.文件緩存/直接內存映射等是常用的手段.因為kafka是對日志文件進行append操作,因此磁盤檢索的開支是較小的;同時為了減少磁盤寫入的次數,broker會將消息暫時buffer起來,當消息的個數(或尺寸)達到一定閥值時,再flush到磁盤,這樣減少了磁盤IO調用的次數.對於kafka而言,較高性能的磁盤,將會帶來更加直接的性能提升.
2.性能
除磁盤IO之外,我們還需要考慮網絡IO,這直接關系到kafka的吞吐量問題.kafka並沒有提供太多高超的技巧;對於producer端,可以將消息buffer起來,當消息的條數達到一定閥值時,批量發送給broker;對於consumer端也是一樣,批量fetch多條消息.不過消息量的大小可以通過配置文件來指定.對於kafka broker端,似乎有個sendfile系統調用可以潛在的提升網絡IO的性能:將文件的數據映射到系統內存中,socket直接讀取相應的內存區域即可,而無需進程再次copy和交換(這里涉及到"磁盤IO數據"/"內核內存"/"進程內存"/"網絡緩沖區",多者之間的數據copy).
其實對於producer/consumer/broker三者而言,CPU的開支應該都不大,因此啟用消息壓縮機制是一個良好的策略;壓縮需要消耗少量的CPU資源,不過對於kafka而言,網絡IO更應該需要考慮.可以將任何在網絡上傳輸的消息都經過壓縮.kafka支持gzip/snappy等多種壓縮方式.
3.負載均衡
kafka集群中的任何一個broker,都可以向producer提供metadata信息,這些metadata中包含"集群中存活的servers列表"/"partitions leader列表"等信息(請參看zookeeper中的節點信息). 當producer獲取到metadata信息之后, producer將會和Topic下所有partition leader保持socket連接;消息由producer直接通過socket發送到broker,中間不會經過任何"路由層".
異步發送,將多條消息暫且在客戶端buffer起來,並將他們批量發送到broker;小數據IO太多,會拖慢整體的網絡延遲,批量延遲發送事實上提升了網絡效率;不過這也有一定的隱患,比如當producer失效時,那些尚未發送的消息將會丟失。
4.Topic模型
其他JMS實現,消息消費的位置是有prodiver保留,以便避免重復發送消息或者將沒有消費成功的消息重發等,同時還要控制消息的狀態.這就要求JMS broker需要太多額外的工作.在kafka中,partition中的消息只有一個consumer在消費,且不存在消息狀態的控制,也沒有復雜的消息確認機制,可見kafka broker端是相當輕量級的.當消息被consumer接收之后,consumer可以在本地保存最后消息的offset,並間歇性的向zookeeper注冊offset.由此可見,consumer客戶端也很輕量級。
kafka中consumer負責維護消息的消費記錄,而broker則不關心這些,這種設計不僅提高了consumer端的靈活性,也適度的減輕了broker端設計的復雜度;這是和眾多JMS prodiver的區別.此外,kafka中消息ACK的設計也和JMS有很大不同,kafka中的消息是批量(通常以消息的條數或者chunk的尺寸為單位)發送給consumer,當消息消費成功后,向zookeeper提交消息的offset,而不會向broker交付ACK.或許你已經意識到,這種"寬松"的設計,將會有"丟失"消息/"消息重發"的危險.
5.消息傳輸一致
Kafka提供3種消息傳輸一致性語義:最多1次,最少1次,恰好1次。
最少1次:可能會重傳數據,有可能出現數據被重復處理的情況;
最多1次:可能會出現數據丟失情況;
恰好1次:並不是指真正只傳輸1次,只不過有一個機制。確保不會出現“數據被重復處理”和“數據丟失”的情況。
at most once: 消費者fetch消息,然后保存offset,然后處理消息;當client保存offset之后,但是在消息處理過程中consumer進程失效(crash),導致部分消息未能繼續處理.那么此后可能其他consumer會接管,但是因為offset已經提前保存,那么新的consumer將不能fetch到offset之前的消息(盡管它們尚沒有被處理),這就是"at most once".
at least once: 消費者fetch消息,然后處理消息,然后保存offset.如果消息處理成功之后,但是在保存offset階段zookeeper異常或者consumer失效,導致保存offset操作未能執行成功,這就導致接下來再次fetch時可能獲得上次已經處理過的消息,這就是"at least once".
"Kafka Cluster"到消費者的場景中可以采取以下方案來得到“恰好1次”的一致性語義:
最少1次+消費者的輸出中額外增加已處理消息最大編號:由於已處理消息最大編號的存在,不會出現重復處理消息的情況。
6.副本
kafka中,replication策略是基於partition,而不是topic;kafka將每個partition數據復制到多個server上,任何一個partition有一個leader和多個follower(可以沒有);備份的個數可以通過broker配置文件來設定。leader處理所有的read-write請求,follower需要和leader保持同步.Follower就像一個"consumer",消費消息並保存在本地日志中;leader負責跟蹤所有的follower狀態,如果follower"落后"太多或者失效,leader將會把它從replicas同步列表中刪除.當所有的follower都將一條消息保存成功,此消息才被認為是"committed",那么此時consumer才能消費它,這種同步策略,就要求follower和leader之間必須具有良好的網絡環境.即使只有一個replicas實例存活,仍然可以保證消息的正常發送和接收,只要zookeeper集群存活即可.
選擇follower時需要兼顧一個問題,就是新leader server上所已經承載的partition leader的個數,如果一個server上有過多的partition leader,意味着此server將承受着更多的IO壓力.在選舉新leader,需要考慮到"負載均衡",partition leader較少的broker將會更有可能成為新的leader.
7.log
每個log entry格式為"4個字節的數字N表示消息的長度" + "N個字節的消息內容";每個日志都有一個offset來唯一的標記一條消息,offset的值為8個字節的數字,表示此消息在此partition中所處的起始位置..每個partition在物理存儲層面,有多個log file組成(稱為segment).segment file的命名為"最小offset".kafka.例如"00000000000.kafka";其中"最小offset"表示此segment中起始消息的offset.
獲取消息時,需要指定offset和最大chunk尺寸,offset用來表示消息的起始位置,chunk size用來表示最大獲取消息的總長度(間接的表示消息的條數).根據offset,可以找到此消息所在segment文件,然后根據segment的最小offset取差值,得到它在file中的相對位置,直接讀取輸出即可.
8.分布式
kafka使用zookeeper來存儲一些meta信息,並使用了zookeeper watch機制來發現meta信息的變更並作出相應的動作(比如consumer失效,觸發負載均衡等)
Broker node registry: 當一個kafka broker啟動后,首先會向zookeeper注冊自己的節點信息(臨時znode),同時當broker和zookeeper斷開連接時,此znode也會被刪除.
Broker Topic Registry: 當一個broker啟動時,會向zookeeper注冊自己持有的topic和partitions信息,仍然是一個臨時znode.
Consumer and Consumer group: 每個consumer客戶端被創建時,會向zookeeper注冊自己的信息;此作用主要是為了"負載均衡".一個group中的多個consumer可以交錯的消費一個topic的所有partitions;簡而言之,保證此topic的所有partitions都能被此group所消費,且消費時為了性能考慮,讓partition相對均衡的分散到每個consumer上.
Consumer id Registry: 每個consumer都有一個唯一的ID(host:uuid,可以通過配置文件指定,也可以由系統生成),此id用來標記消費者信息.
Consumer offset Tracking: 用來跟蹤每個consumer目前所消費的partition中最大的offset.此znode為持久節點,可以看出offset跟group_id有關,以表明當group中一個消費者失效,其他consumer可以繼續消費.
Partition Owner registry: 用來標記partition正在被哪個consumer消費.臨時znode。此節點表達了"一個partition"只能被group下一個consumer消費,同時當group下某個consumer失效,那么將會觸發負載均衡(即:讓partitions在多個consumer間均衡消費,接管那些"游離"的partitions)
當consumer啟動時,所觸發的操作:
A) 首先進行"Consumer id Registry";
B) 然后在"Consumer id Registry"節點下注冊一個watch用來監聽當前group中其他consumer的"leave"和"join";只要此znode path下節點列表變更,都會觸發此group下consumer的負載均衡.(比如一個consumer失效,那么其他consumer接管partitions).
C) 在"Broker id registry"節點下,注冊一個watch用來監聽broker的存活情況;如果broker列表變更,將會觸發所有的groups下的consumer重新balance.
總結:
1) Producer端使用zookeeper用來"發現"broker列表,以及和Topic下每個partition leader建立socket連接並發送消息.
2) Broker端使用zookeeper用來注冊broker信息,已經監測partition leader存活性.
3) Consumer端使用zookeeper用來注冊consumer信息,其中包括consumer消費的partition列表等,同時也用來發現broker列表,並和partition leader建立socket連接,並獲取消息。
9.Leader的選擇
Kafka的核心是日志文件,日志文件在集群中的同步是分布式數據系統最基礎的要素。
如果leaders永遠不會down的話我們就不需要followers了!一旦leader down掉了,需要在followers中選擇一個新的leader.但是followers本身有可能延時太久或者crash,所以必須選擇高質量的follower作為leader.必須保證,一旦一個消息被提交了,但是leader down掉了,新選出的leader必須可以提供這條消息。大部分的分布式系統采用了多數投票法則選擇新的leader,對於多數投票法則,就是根據所有副本節點的狀況動態的選擇最適合的作為leader.Kafka並不是使用這種方法。
Kafka動態維護了一個同步狀態的副本的集合(a set of in-sync replicas),簡稱ISR,在這個集合中的節點都是和leader保持高度一致的,任何一條消息必須被這個集合中的每個節點讀取並追加到日志中了,才回通知外部這個消息已經被提交了。因此這個集合中的任何一個節點隨時都可以被選為leader.ISR在ZooKeeper中維護。ISR中有f+1個節點,就可以允許在f個節點down掉的情況下不會丟失消息並正常提供服。ISR的成員是動態的,如果一個節點被淘汰了,當它重新達到“同步中”的狀態時,他可以重新加入ISR.這種leader的選擇方式是非常快速的,適合kafka的應用場景。
一個邪惡的想法:如果所有節點都down掉了怎么辦?Kafka對於數據不會丟失的保證,是基於至少一個節點是存活的,一旦所有節點都down了,這個就不能保證了。
實際應用中,當所有的副本都down掉時,必須及時作出反應。可以有以下兩種選擇:
1. 等待ISR中的任何一個節點恢復並擔任leader。
2. 選擇所有節點中(不只是ISR)第一個恢復的節點作為leader.
這是一個在可用性和連續性之間的權衡。如果等待ISR中的節點恢復,一旦ISR中的節點起不起來或者數據都是了,那集群就永遠恢復不了了。如果等待ISR意外的節點恢復,這個節點的數據就會被作為線上數據,有可能和真實的數據有所出入,因為有些數據它可能還沒同步到。Kafka目前選擇了第二種策略,在未來的版本中將使這個策略的選擇可配置,可以根據場景靈活的選擇。
這種窘境不只Kafka會遇到,幾乎所有的分布式數據系統都會遇到。
10.副本管理
以上僅僅以一個topic一個分區為例子進行了討論,但實際上一個Kafka將會管理成千上萬的topic分區.Kafka盡量的使所有分區均勻的分布到集群所有的節點上而不是集中在某些節點上,另外主從關系也盡量均衡這樣每個幾點都會擔任一定比例的分區的leader.
優化leader的選擇過程也是很重要的,它決定了系統發生故障時的空窗期有多久。Kafka選擇一個節點作為“controller”,當發現有節點down掉的時候它負責在游泳分區的所有節點中選擇新的leader,這使得Kafka可以批量的高效的管理所有分區節點的主從關系。如果controller down掉了,活着的節點中的一個會備切換為新的controller.
11.Leader與副本同步
對於某個分區來說,保存正分區的"broker"為該分區的"leader",保存備份分區的"broker"為該分區的"follower"。備份分區會完全復制正分區的消息,包括消息的編號等附加屬性值。為了保持正分區和備份分區的內容一致,Kafka采取的方案是在保存備份分區的"broker"上開啟一個消費者進程進行消費,從而使得正分區的內容與備份分區的內容保持一致。一般情況下,一個分區有一個“正分區”和零到多個“備份分區”。可以配置“正分區+備份分區”的總數量,關於這個配置,不同主題可以有不同的配置值。注意,生產者,消費者只與保存正分區的"leader"進行通信。
Kafka允許topic的分區擁有若干副本,這個數量是可以配置的,你可以為每個topic配置副本的數量。Kafka會自動在每個副本上備份數據,所以當一個節點down掉時數據依然是可用的。
Kafka的副本功能不是必須的,你可以配置只有一個副本,這樣其實就相當於只有一份數據。
創建副本的單位是topic的分區,每個分區都有一個leader和零或多個followers.所有的讀寫操作都由leader處理,一般分區的數量都比broker的數量多的多,各分區的leader均勻的分布在brokers中。所有的followers都復制leader的日志,日志中的消息和順序都和leader中的一致。followers向普通的consumer那樣從leader那里拉取消息並保存在自己的日志文件中。
許多分布式的消息系統自動的處理失敗的請求,它們對一個節點是否着(alive)”有着清晰的定義。Kafka判斷一個節點是否活着有兩個條件:
1. 節點必須可以維護和ZooKeeper的連接,Zookeeper通過心跳機制檢查每個節點的連接。
2. 如果節點是個follower,他必須能及時的同步leader的寫操作,延時不能太久。
符合以上條件的節點准確的說應該是“同步中的(in sync)”,而不是模糊的說是“活着的”或是“失敗的”。Leader會追蹤所有“同步中”的節點,一旦一個down掉了,或是卡住了,或是延時太久,leader就會把它移除。至於延時多久算是“太久”,是由參數replica.lag.max.messages決定的,怎樣算是卡住了,怎是由參數replica.lag.time.max.ms決定的。
只有當消息被所有的副本加入到日志中時,才算是“committed”,只有committed的消息才會發送給consumer,這樣就不用擔心一旦leader down掉了消息會丟失。Producer也可以選擇是否等待消息被提交的通知,這個是由參數acks決定的。
Kafka保證只要有一個“同步中”的節點,“committed”的消息就不會丟失。
一個典型的Kafka集群中包含若干Producer(可以是web前端FET,或者是服務器日志等),若干broker(Kafka支持水平擴展,一般broker數量越多,集群吞吐率越高),若干ConsumerGroup,以及一個Zookeeper集群。Kafka通過Zookeeper管理Kafka集群配置:選舉Kafka broker的leader,以及在Consumer Group發生變化時進行rebalance,因為consumer消費kafka topic的partition的offsite信息是存在Zookeeper的。Producer使用push模式將消息發布到broker,Consumer使用pull模式從broker訂閱並消費消息。
分析過程分為以下4個步驟:
- topic中partition存儲分布
- partiton中文件存儲方式 (partition在linux服務器上就是一個目錄(文件夾))
- partiton中segment文件存儲結構
- 在partition中如何通過offset查找message
通過上述4過程詳細分析,我們就可以清楚認識到kafka文件存儲機制的奧秘。
2.3 topic中partition存儲分布
假設實驗環境中Kafka集群只有一個broker,xxx/message-folder為數據文件存儲根目錄,在Kafka broker中server.properties文件配置(參數log.dirs=xxx/message-folder),例如創建2個topic名 稱分別為report_push、launch_info, partitions數量都為partitions=4
存儲路徑和目錄規則為:
xxx/message-folder
|--report_push-0
|--report_push-1
|--report_push-2
|--report_push-3
|--launch_info-0
|--launch_info-1
|--launch_info-2
|--launch_info-3
在Kafka文件存儲中,同一個topic下有多個不同partition,每個partition為一個目錄,partiton命名規則為topic名稱+有序序號,第一個partiton序號從0開始,序號最大值為partitions數量減1。
消息發送時都被發送到一個topic,其本質就是一個目錄,而topic由是由一些Partition組成,其組織結構如下圖所示:
我們可以看到,Partition是一個Queue的結構,每個Partition中的消息都是有序的,生產的消息被不斷追加到Partition上,其中的每一個消息都被賦予了一個唯一的offset值。
Kafka集群會保存所有的消息,不管消息有沒有被消費;
我們可以設定消息的過期時間,只有過期的數據才會被自動清除以釋放磁盤空間。比如我們設置消息過期時間為2天,那么這2天內的所有消息都會被保存到集群中,數據只有超過了兩天才會被清除。
Kafka只維護在Partition中的offset值,因為這個offsite標識着這個partition的message消費到哪條了。Consumer每消費一個消息,offset就會加1。其實消息的狀態完全是由Consumer控制的,Consumer可以跟蹤和重設這個offset值,這樣的話Consumer就可以讀取任意位置的消息。
把消息日志以Partition的形式存放有多重考慮,第一,方便在集群中擴展,每個Partition可以通過調整以適應它所在的機器,而一個topic又可以有多個Partition組成,因此整個集群就可以適應任意大小的數據了;第二就是可以提高並發,因為可以以Partition為單位讀寫了。
通過上面介紹的我們可以知道,kafka中的數據是持久化的並且能夠容錯的。Kafka允許用戶為每個topic設置副本數量,副本數量決定了有幾個broker來存放寫入的數據。如果你的副本數量設置為3,那么一份數據就會被存放在3台不同的機器上,那么就允許有2個機器失敗。一般推薦副本數量至少為2,這樣就可以保證增減、重啟機器時不會影響到數據消費。如果對數據持久化有更高的要求,可以把副本數量設置為3或者更多。
Kafka中的topic是以partition的形式存放的,每一個topic都可以設置它的partition數量,Partition的數量決定了組成topic的message的數量。Producer在生產數據時,會按照一定規則(這個規則是可以自定義的)把消息發布到topic的各個partition中。上面將的副本都是以partition為單位的,不過只有一個partition的副本會被選舉成leader作為讀寫用。
關於如何設置partition值需要考慮的因素。
一個partition只能被一個消費者消費(一個消費者可以同時消費多個partition),因此,如果設置的partition的數量小於consumer的數量,就會有消費者消費不到數據。所以,推薦partition的數量一定要大於同時運行的consumer的數量。另外一方面,建議partition的數量大於集群broker的數量,這樣leader partition就可以均勻的分布在各個broker中,最終使得集群負載均衡。在Cloudera,每個topic都有上百個partition。需要注意的是,kafka需要為每個partition分配一些內存來緩存消息數據,如果partition數量越大,就要為kafka分配更大的heap space。
2.4 partiton中文件存儲方式
- 每個partion(目錄)相當於一個巨型文件被平均分配到多個大小相等segment(段)數據文件中。但每個段segment file消息數量不一定相等,這種特性方便old segment file快速被刪除。
- 每個partiton只需要支持順序讀寫就行了,segment文件生命周期由服務端配置參數決定。
這樣做的好處就是能快速刪除無用文件,有效提高磁盤利用率。