kafka介紹
kafka is a distributed, partitiononed,replicated commited logservice. kafka是一個分布式的、易擴展的、安全性高的消息服務系統。kafka提供了類似於JMS的特性,但在設計實現上又完全不同,它並不是基於JMS規范實現的(kafka的實現不包含事務特性性)。kafka對消息的保存時以Topic進行歸類的,向Topic發送消息的稱謂Producer,從Topic接受消息的稱謂Consumer。kafka集群由多個service組成,每個service在kafka集群中被稱作broker。kafka集群的作用就是存儲從Producer發過來的消息,然后按照一定的規則將消息發送給Consumer。無論是kafka集群本身,還是Producer 或者Consumer,均依賴於zookeeper來管理集群中的信息同步。下圖為kafka基本結構組成圖:

kafka系統名詞解釋
- Topics
一個topic可以認為是一類消息,每個topic可以被划分成多個partition(區),每個partition在存儲層面是append log文件。任何發布到此partition的消息都會被直接追加到log文件的尾部,每條消息在文件中的位置稱為offset(偏移量),offset為一個Long型數字,他是log中一條消息的唯一標識。kafka並沒有提供其他額外的索引機制來存儲offset,因為kafka的消息讀寫機制是順序讀寫,保證kafka的吞吐率,幾乎不允許(可以)對消息進行隨機讀取。

kafka和JMS的實現(activeMQ)不同的是:即使消息被消費了,消息仍然不會被立即刪除。日志文件將會根據broker中的配置要求,將message保留一段時間后再刪除。比如將log文件保留2天,那么兩天后,該文件將會被刪除,無論其中的消息是否被消費。kafka通過這種簡單的方式來釋放磁盤空間。
對於consumer而言,他需要保存消費消息的offset,對於offset的保存和使用,由consumer來控制;當consumer正常消費消息時,offset將會“線性的”向前驅動,即,消息將依照順序被消費。實際上,consumer也可以通過指定offset來消費特定的消息(offset將會保存在zookeeper中,參見下文)。
kafka集群幾乎不需要維護任何producer和consumer的狀態消息,這些消息由zookeeper來維護保存,因此,producer和consumer的客戶端非常輕量級,他們可以隨意的加入或者離開,不會對集群造成額外的影響。
partition的設計目的有多個,最根本的原因是kafka基於文件存儲,通過分區,可以將日志內容分布到多個broker上,避免文件尺寸達到單機的存儲上線。可以將一個topic划分成多個partitions,這樣既可以降低對單機磁盤容量的要求,又可以提高系統消息的讀寫速率。此外,越多的partition意味着可以容納更多的consumer,更有效的提升並發性能。
- Distination
一個topic的多個partition被分配在kafka集群的多個broker上,每個broker負責partitions中消息的讀寫操作;此外,kafka還可以配置partition需要的備份個數(replicas),每個partition將會被備份到多個broker中,這樣就增強了系統的可靠性。
基於replicated方案,那么就意味着需要對多個備份進行調度,每個partition都有一個broker為“leader”,其余都為“follower”。leader負責所有的讀寫操作,如果leader失效,那么將會有其他的follower被選舉為新的leader;follower只是單純的和leader進行消息同步即可。由此可見,部署leader的broker承載了partition全部的請求壓力,因此,從集群的整體角度考慮,有多少個partition,就有多少個leader,kafka將會將這些leader均衡的分配在broker上,來確保集群整體的吞吐量和穩定性。
- Producer
Producer將消息發布到指定的topic中,同時,producer還需要指定該消息屬於哪個partition
- Consumer
本質上kafka只支持topic,每一個consumer屬於一個consumer group,每個consumer group可以包含多個consumer。發送到topic的消息只會被訂閱該topic的每個group中的一個consumer消費。
如果所有的consumer都具有相同的group,這種情況和queue很相似,消息將會在consumer之間均衡分配;
如果所有的consumer都在不同的group中,這種情況就是廣播模式,消息會被發送到所有訂閱該topic的group中,那么所有的consumer都會消費到該消息。
kafka的設計原理決定,對於同一個topic,同一個group中consumer的數量不能多於partition的數量,否則就會有consumer無法獲取到消息。
- Guarantees
- 發送到partition中的消息將會按照它的接受順序追加到日志中;
- 對於消費者而言,它的消息消費順序是和日志中的順序一致的;
- 如果partition的replicationfactor為N,那么就允許N-1個broker失效。
kafka使用場景
- Messaging
對於一些常規的消息系統,kafka是個不錯的選擇,partition/replication和容錯,使得kafka具有良好的擴展性和安全性。不過到目前為止,kafka並沒有提供JMS中的“事務性”“消息傳輸擔保機制(消息確認機制)”“消息分組”等企業級特性;kafka只能作為常規的消息系統,在一定程度上,尚未確定消息發送與接收的絕對可靠。
- websit activity tracking
kafka可以作為“網站活性追蹤”的最佳工具;可以將網頁/用戶操作等信息發送到kafka,進行實時監控或者離線分析等。
- Log Aggregation
kafka的特性決定了他非常適合做“日志手機中心”;application可以將操作日志批量“異步”發送到kafka集群中,而不是保存在本地或者DB中;kafka可以對消息進行批量的上傳和壓縮等,這對producer而言,幾乎感覺不到性能的開銷。此時,consumer端可以使用hadoop等其他系統化的存儲和分析系統。
kafka設計原理
kafka設計初衷是希望作為一個統一的信息收集平台,能夠事實的收集和反饋信息,並且能夠支撐較大的數據量,且具備良好的容錯能力。
- 持久性
kafka使用文件存儲消息,這就直接決定了kafka在性能上嚴重依賴於文件系統本身的性能。而且,無論在任何OS下,對文件系統本身的優化幾乎沒有了改進的可能。文件緩存/直接內存映射是常用的提高文件系統性能的手段。因為kafka是對日志文件進行append操作,因此磁盤檢索的開銷還是比較少的,同時,為了減少磁盤寫入的次數,broker會暫時將消息buffer起來,當消息數量達到了一個閾值,在寫入到磁盤,這樣就可以減少磁盤IO的次數。
- 性能
除了磁盤IO性能意外,我們還需要考慮網絡IO,這直接關系到kafka的吞吐量問題。kafka並沒有提供太多高超的技巧。對於producer端而言,可以將消息buffer起來,當消息數量達到一定的閾值時,批量發送給broker;對於consumer端而言,也可以批量的fatch消息,不過消息量的大小是可以通過配置文件進行配置的。對於kafka broker端,sendfile系統調用可以潛在的提高網絡IO性能:將文件數據映射到系統內存中,socket直接讀取相應的內存區域即可,而不需要進程再次進行copy和交換。對於producer、consumer、broker而言,CPU的開支都不大,因此啟用消息壓縮機制是一個很好的策略;壓縮需要消耗少量的CPU資源,不過對於kafka而言,網絡IO應該更為重要。可以通過將任何在網路上傳輸的消息進行壓縮來提高網絡IO性能。kafka支持多種壓縮方式(gzip/snappy)。
- 生產者
負載均衡:producer將會和topic下的所有partition leader保持socket連接;消息由producer直接通過socket發送個broker,中間不會經過任何“消息路由”,實際上,消息被發送給哪個broker由producer端決定。如果一個消息有多個partitions,那么在producer端實現消息“均衡分發”是很有必要的。
其中partition leader位置(host:port)保存在zookeeper中,producer作為zookeeper client,已經注冊了watch用來監聽partition leader的變更事件。
異步發送:將多條消息暫且保存在producer的buffer中,當達到一定的數量閾值時,將他們一起批量發送給broker,延遲批量發送實際上是提高了網絡效率。不過也存在一些隱患,比如當producer失效時,那些尚未發送出去的消息將會丟失。
- 消費者
consumer端向broker發送fatch請求,並告訴其獲取消息的offset,此后,consumer會獲得一定數量的消息,consumer端也可以通過重置offset來重新獲取想要的消息。
在JMS中,topic模型是基於push方式的,即broker將消息推送給consumer端。不過在kafka中,采用的是pull模型,即consumer在和broker建立連接后,主動去pull(也就是fatch)消息;這種模式有自己的優點,首先,consumer可以根據自己的消費需求去fatch合適的消息並進行處理,此外,消費者可以良好的控制消費消息的數量。
在kafka中,partition中的消息只有一個consumer在消費,切不存在消息狀態的控制,也沒有復雜的消息確認機制,可見,kafka broker是相當輕量級的。當消息被consumer接收后,consumer在本地保存最后消費消息的offset,並間歇性的向zookeeper注冊offset。由此可見,consumer也是輕量級的。

kafka在zookeeper中的存儲結構圖如下所示:

kafka安全機制:partition復制備份
kafka將每個partition數據復制到多個broker上,任何一個partition都有一個leader和多個follower(可以沒有)。備份的個數可以通過broker的配置文件進行配置。leader處理所有的read-write請求,follower只需要和leader保持信息同步即可,leader負責跟蹤所有的follower狀態信息,如果follower落后太多或者失效,leader將會把它從replicas同步列表中刪除。當所有的follower將一條消息保存成功,該消息才被認為是發送成功(committed),此時,consumer才能消費它。即使只有一個replicas存活,仍然可以保證消息的正常發送和接受,只要zookeeper集群存活即可。(不同於其他分布式存儲,需要多數派存活)
當leader失效時,需要在follower中選擇一個新的leader(此選舉並非通過zookeeper進行選舉的),可能此時的follower落后於leader,因此需要一個“up-to-date”的follower。選擇新的leader時也需要同時兼顧一個問題,那就是broker上leader的數量,如果一個server上有多個leader,意味着此service將承受更多的IO壓力,所以在選舉時,需要考慮leader的“負載平衡”。
參考文獻
