kafka集群原理介紹


kafka集群原理介紹

@(博客文章)[kafka|大數據]

本系統文章共三篇,分別為

1、kafka集群原理介紹了以下幾個方面的內容:

(1)kafka基礎理論

(2)參數配置

(3)錯誤處理

(4)kafka集群在zookeeper集群中的內容

2、kafka集群操作介紹了kafka集群的安裝與操作

(1)單機版安裝

(2)集群安裝

(3)集群啟停操作

(4)topic相關操作

(5)某個broker掛掉,重啟本機器

(6)某個broker掛掉且無法重啟,使用其它機器代替

(7)擴容

(8)數據遷移

(9)機器下線

(10)增加副本數量

(11)平衡leader

3、kafka集群編程介紹了...

(一)基礎理論

1、相關資料
官方資料,非常詳細:
http://kafka.apache.org/documentation.html#quickstart

以下部分內容來源於此文檔。

2、kafka是什么?
(1)Kafka is a distributed, partitioned, replicated commit log service. It provides the functionality of a messaging system, but with a unique design.
Kafka是一個 分布式的、可分區的、可復制的消息系統。它提供了普通消息系統的功能,但具有自己獨特的設計。
(2)可以簡單的理解為:kafka是一個日志集群,各種各樣的服務器將它們自身的日志發送到集群中進行統一匯總和存儲,然后其它機器從集群中拉取消息進行分析處理,如ELT、數據挖掘等。
(3)kafka使用scala語言實現,提供了JAVA API,同時對多種語言都提供了支持。

3、幾個關鍵術語
topic: Kafka將消息以topic為單位進行歸納。
producer: 將向Kafka topic發布消息的程序稱為producers.
consumer: 將預訂topics並消費消息的程序稱為consumer.
broker: Kafka以集群的方式運行,可以由一個或多個服務組成,每個服務叫做一個broker.

4、分區與副本
(1)一個topic是對一組消息的歸納。對每個topic,Kafka 對它的日志進行了分區。
(2)一般而言,一個topic會有多個分區,每個分區會有多個副本。
分區是分了將一個topic分到多個地方存儲,提高並行處理的能力。副本是為了容錯,保證數據不丟失。
(3)對於每一個分區,都會選取一個leader,這個分區的所有讀取都在這個leader中進行,而其它副本會同步leader中的數據,且只做備份。
即leader只是針對一個分區而言,而非整個集群。一個服務器對於某個分區是leader,對於其它分區可能是follower。
(4) Producer將消息發布到它指定的topic中,並負責決定發布到哪個分區。通常簡單的由負載均衡機制隨機選擇分區,但也可以通過特定的分區函數選擇分區。
(5)發布消息通常有兩種模式:隊列模式(queuing)和發布-訂閱模式(publish-subscribe)。隊列模式中,consumers可以同時從服務端讀取消息,每個消息只被其中一個consumer讀到;發布-訂閱模式中消息被廣播到所有的consumer中。
Consumers可以加入一個consumer 組,共同競爭一個topic,topic中的消息將被分發到組中的一個成員中。同一組中的consumer可以在不同的程序中,也可以在不同的機器上。如果所有的consumer都在一個組中,這就成為了傳統的隊列模式,在各consumer中實現負載均衡。
如果所有的consumer都不在不同的組中,這就成為了發布-訂閱模式,所有的消息都被分發到所有的consumer中。
更常見的是,每個topic都有若干數量的consumer組,每個組都是一個邏輯上的“訂閱者”,為了容錯和更好的穩定性,每個組由若干consumer組成。這其實就是一個發布-訂閱模式,只不過訂閱者是個組而不是單個consumer。
(6)有序性

相比傳統的消息系統,Kafka可以很好的保證有序性。
傳統的隊列在服務器上保存有序的消息,如果多個consumers同時從這個服務器消費消息,服務器就會以消息存儲的順序向consumer分 發消息。雖然服務器按順序發布消息,但是消息是被異步的分發到各consumer上,所以當消息到達時可能已經失去了原來的順序,這意味着並發消費將導致 順序錯亂。為了避免故障,這樣的消息系統通常使用“專用consumer”的概念,其實就是只允許一個消費者消費消息,當然這就意味着失去了並發性。

在這方面Kafka做的更好,通過分區的概念,Kafka可以在多個consumer組並發的情況下提供較好的有序性和負載均衡。將每個分區分 只分發給一個consumer組,這樣一個分區就只被這個組的一個consumer消費,就可以順序的消費這個分區的消息。因為有多個分區,依然可以在多 個consumer組之間進行負載均衡。注意consumer組的數量不能多於分區的數量,也就是有多少分區就允許多少並發消費。

Kafka只能保證一個分區之內消息的有序性,在不同的分區之間是不可以的,這已經可以滿足大部分應用的需求。如果需要topic中所有消息的有序性,那就只能讓這個topic只有一個分區,當然也就只有一個consumer組消費它。

5、數據持久化(本部分內容直接翻譯自官方文檔)

不要畏懼文件系統!

Kafka大量依賴文件系統去存儲和緩存消息。對於硬盤有個傳統的觀念是硬盤總是很慢,這使很多人懷疑基於文件系統的架構能否提供優異的性能。實際上硬盤的快慢完全取決於使用它的方式。設計良好的硬盤架構可以和內存一樣快。

在6塊7200轉的SATA RAID-5磁盤陣列的線性寫速度差不多是600MB/s,但是隨即寫的速度卻是100k/s,差了差不多6000倍。現代的操作系統都對次做了大量的優化,使用了 read-ahead 和 write-behind的技巧,讀取的時候成塊的預讀取數據,寫的時候將各種微小瑣碎的邏輯寫入組織合並成一次較大的物理寫入。對此的深入討論可以查看這里,它們發現線性的訪問磁盤,很多時候比隨機的內存訪問快得多。

為了提高性能,現代操作系統往往使用內存作為磁盤的緩存,現代操作系統樂於把所有空閑內存用作磁盤緩存,雖然這可能在緩存回收和重新分配時犧牲一些性能。所有的磁盤讀寫操作都會經過這個緩存,這不太可能被繞開除非直接使用I/O。所以雖然每個程序都在自己的線程里只緩存了一份數據,但在操作系統的緩存里還有一份,這等於存了兩份數據。

另外再來討論一下JVM,以下兩個事實是眾所周知的:

•Java對象占用空間是非常大的,差不多是要存儲的數據的兩倍甚至更高。

•隨着堆中數據量的增加,垃圾回收回變的越來越困難。

基於以上分析,如果把數據緩存在內存里,因為需要存儲兩份,不得不使用兩倍的內存空間,Kafka基於JVM,又不得不將空間再次加倍,再加上要避免GC帶來的性能影響,在一個32G內存的機器上,不得不使用到28-30G的內存空間。並且當系統重啟的時候,又必須要將數據刷到內存中( 10GB 內存差不多要用10分鍾),就算使用冷刷新(不是一次性刷進內存,而是在使用數據的時候沒有就刷到內存)也會導致最初的時候新能非常慢。但是使用文件系統,即使系統重啟了,也不需要刷新數據。使用文件系統也簡化了維護數據一致性的邏輯。

所以與傳統的將數據緩存在內存中然后刷到硬盤的設計不同,Kafka直接將數據寫到了文件系統的日志中。

常量時間的操作效率

在大多數的消息系統中,數據持久化的機制往往是為每個cosumer提供一個B樹或者其他的隨機讀寫的數據結構。B樹當然是很棒的,但是也帶了一些代價:比如B樹的復雜度是O(log N),O(log N)通常被認為就是常量復雜度了,但對於硬盤操作來說並非如此。磁盤進行一次搜索需要10ms,每個硬盤在同一時間只能進行一次搜索,這樣並發處理就成了問題。雖然存儲系統使用緩存進行了大量優化,但是對於樹結構的性能的觀察結果卻表明,它的性能往往隨着數據的增長而線性下降,數據增長一倍,速度就會降低一倍。

直觀的講,對於主要用於日志處理的消息系統,數據的持久化可以簡單的通過將數據追加到文件中實現,讀的時候從文件中讀就好了。這樣做的好處是讀和寫都是 O(1) 的,並且讀操作不會阻塞寫操作和其他操作。這樣帶來的性能優勢是很明顯的,因為性能和數據的大小沒有關系了。

既然可以使用幾乎沒有容量限制(相對於內存來說)的硬盤空間建立消息系統,就可以在沒有性能損失的情況下提供一些一般消息系統不具備的特性。比如,一般的消息系統都是在消息被消費后立即刪除,Kafka卻可以將消息保存一段時間(比如一星期),這給consumer提供了很好的機動性和靈活性。

6、事務性

之前討論了consumer和producer是怎么工作的,現在來討論一下數據傳輸方面。數據傳輸的事務定義通常有以下三種級別:

最多一次: 消息不會被重復發送,最多被傳輸一次,但也有可能一次不傳輸。

最少一次: 消息不會被漏發送,最少被傳輸一次,但也有可能被重復傳輸.

精確的一次(Exactly once): 不會漏傳輸也不會重復傳輸,每個消息都傳輸被一次而且僅僅被傳輸一次,這是大家所期望的。

大多數消息系統聲稱可以做到“精確的一次”,但是仔細閱讀它們的的文檔可以看到里面存在誤導,比如沒有說明當consumer或producer失敗時怎么樣,或者當有多個consumer並行時怎么樣,或寫入硬盤的數據丟失時又會怎么樣。kafka的做法要更先進一些。當發布消息時,Kafka有一個“committed”的概念,一旦消息被提交了,只要消息被寫入的分區的所在的副本broker是活動的,數據就不會丟失。關於副本的活動的概念,下節文檔會討論。現在假設broker是不會down的。

如果producer發布消息時發生了網絡錯誤,但又不確定實在提交之前發生的還是提交之后發生的,這種情況雖然不常見,但是必須考慮進去,現在Kafka版本還沒有解決這個問題,將來的版本正在努力嘗試解決。

並不是所有的情況都需要“精確的一次”這樣高的級別,Kafka允許producer靈活的指定級別。比如producer可以指定必須等待消息被提交的通知,或者完全的異步發送消息而不等待任何通知,或者僅僅等待leader聲明它拿到了消息(followers沒有必要)。

現在從consumer的方面考慮這個問題,所有的副本都有相同的日志文件和相同的offset,consumer維護自己消費的消息的offset,如果consumer不會崩潰當然可以在內存中保存這個值,當然誰也不能保證這點。如果consumer崩潰了,會有另外一個consumer接着消費消息,它需要從一個合適的offset繼續處理。這種情況下可以有以下選擇:

consumer可以先讀取消息,然后將offset寫入日志文件中,然后再處理消息。這存在一種可能就是在存儲offset后還沒處理消息就crash了,新的consumer繼續從這個offset處理,那么就會有些消息永遠不會被處理,這就是上面說的“最多一次”。

consumer可以先讀取消息,處理消息,最后記錄offset,當然如果在記錄offset之前就crash了,新的consumer會重復的消費一些消息,這就是上面說的“最少一次”。

“精確一次”可以通過將提交分為兩個階段來解決:保存了offset后提交一次,消息處理成功之后再提交一次。但是還有個更簡單的做法:將消息的offset和消息被處理后的結果保存在一起。比如用Hadoop ETL處理消息時,將處理后的結果和offset同時保存在HDFS中,這樣就能保證消息和offser同時被處理了

7、關於性能優化

Kafka在提高效率方面做了很大努力。Kafka的一個主要使用場景是處理網站活動日志,吞吐量是非常大的,每個頁面都會產生好多次寫操作。讀方面,假設每個消息只被消費一次,讀的量的也是很大的,Kafka也盡量使讀的操作更輕量化。

我們之前討論了磁盤的性能問題,線性讀寫的情況下影響磁盤性能問題大約有兩個方面:太多的瑣碎的I/O操作和太多的字節拷貝。I/O問題發生在客戶端和服務端之間,也發生在服務端內部的持久化的操作中。
消息集(message set)
為了避免這些問題,Kafka建立了“消息集(message set)”的概念,將消息組織到一起,作為處理的單位。以消息集為單位處理消息,比以單個的消息為單位處理,會提升不少性能。Producer把消息集一塊發送給服務端,而不是一條條的發送;服務端把消息集一次性的追加到日志文件中,這樣減少了瑣碎的I/O操作。consumer也可以一次性的請求一個消息集。
另外一個性能優化是在字節拷貝方面。在低負載的情況下這不是問題,但是在高負載的情況下它的影響還是很大的。為了避免這個問題,Kafka使用了標准的二進制消息格式,這個格式可以在producer,broker和producer之間共享而無需做任何改動。
zero copy
Broker維護的消息日志僅僅是一些目錄文件,消息集以固定隊的格式寫入到日志文件中,這個格式producer和consumer是共享的,這使得Kafka可以一個很重要的點進行優化:消息在網絡上的傳遞。現代的unix操作系統提供了高性能的將數據從頁面緩存發送到socket的系統函數,在linux中,這個函數是sendfile.
為了更好的理解sendfile的好處,我們先來看下一般將數據從文件發送到socket的數據流向:

操作系統把數據從文件拷貝內核中的頁緩存中
應用程序從頁緩存從把數據拷貝自己的內存緩存中
應用程序將數據寫入到內核中socket緩存中
操作系統把數據從socket緩存中拷貝到網卡接口緩存,從這里發送到網絡上。

這顯然是低效率的,有4次拷貝和2次系統調用。Sendfile通過直接將數據從頁面緩存發送網卡接口緩存,避免了重復拷貝,大大的優化了性能。
在一個多consumers的場景里,數據僅僅被拷貝到頁面緩存一次而不是每次消費消息的時候都重復的進行拷貝。這使得消息以近乎網絡帶寬的速率發送出去。這樣在磁盤層面你幾乎看不到任何的讀操作,因為數據都是從頁面緩存中直接發送到網絡上去了。

8、數據壓縮
很多時候,性能的瓶頸並非CPU或者硬盤而是網絡帶寬,對於需要在數據中心之間傳送大量數據的應用更是如此。當然用戶可以在沒有Kafka支持的情況下各自壓縮自己的消息,但是這將導致較低的壓縮率,因為相比於將消息單獨壓縮,將大量文件壓縮在一起才能起到最好的壓縮效果。
Kafka采用了端到端的壓縮:因為有“消息集”的概念,客戶端的消息可以一起被壓縮后送到服務端,並以壓縮后的格式寫入日志文件,以壓縮的格式發送到consumer,消息從producer發出到consumer拿到都被是壓縮的,只有在consumer使用的時候才被解壓縮,所以叫做“端到端的壓縮”。
Kafka支持GZIP和Snappy壓縮協議。

9、producer和consumer

Kafka Producer

消息發送

producer直接將數據發送到broker的leader(主節點),不需要在多個節點進行分發。為了幫助producer做到這點,所有的Kafka節點都可以及時的告知:哪些節點是活動的,目標topic目標分區的leader在哪。這樣producer就可以直接將消息發送到目的地了。

客戶端控制消息將被分發到哪個分區。可以通過負載均衡隨機的選擇,或者使用分區函數。Kafka允許用戶實現分區函數,指定分區的key,將消息hash到不同的分區上(當然有需要的話,也可以覆蓋這個分區函數自己實現邏輯).比如如果你指定的key是user id,那么同一個用戶發送的消息都被發送到同一個分區上。經過分區之后,consumer就可以有目的的消費某個分區的消息。

異步發送

批量發送可以很有效的提高發送效率。Kafka producer的異步發送模式允許進行批量發送,先將消息緩存在內存中,然后一次請求批量發送出去。這個策略可以配置的,比如可以指定緩存的消息達到某個量的時候就發出去,或者緩存了固定的時間后就發送出去(比如100條消息就發送,或者每5秒發送一次)。這種策略將大大減少服務端的I/O次數。

既然緩存是在producer端進行的,那么當producer崩潰時,這些消息就會丟失。Kafka0.8.1的異步發送模式還不支持回調,就不能在發送出錯時進行處理。Kafka 0.9可能會增加這樣的回調函數。見Proposed Producer API.

Kafka Consumer

Kafa consumer消費消息時,向broker發出"fetch"請求去消費特定分區的消息。consumer指定消息在日志中的偏移量(offset),就可以消費從這個位置開始的消息。customer擁有了offset的控制權,可以向后回滾去重新消費之前的消息,這是很有意義的。

10、推還是拉?

Kafka最初考慮的問題是,customer應該從brokes拉取消息還是brokers將消息推送到consumer,也就是pull還push。在這方面,Kafka遵循了一種大部分消息系統共同的傳統的設計:producer將消息推送到broker,consumer從broker拉取消息。
一些消息系統比如Scribe和Apache Flume采用了push模式,將消息推送到下游的consumer。這樣做有好處也有壞處:由broker決定消息推送的速率,對於不同消費速率的consumer就不太好處理了。消息系統都致力於讓consumer以最大的速率最快速的消費消息,但不幸的是,push模式下,當broker推送的速率遠大於consumer消費的速率時,consumer恐怕就要崩潰了。最終Kafka還是選取了傳統的pull模式。
Pull模式的另外一個好處是consumer可以自主決定是否批量的從broker拉取數據。Push模式必須在不知道下游consumer消費能力和消費策略的情況下決定是立即推送每條消息還是緩存之后批量推送。如果為了避免consumer崩潰而采用較低的推送速率,將可能導致一次只推送較少的消息而造成浪費。Pull模式下,consumer就可以根據自己的消費能力去決定這些策略。
Pull有個缺點是,如果broker沒有可供消費的消息,將導致consumer不斷在循環中輪詢,直到新消息到t達。為了避免這點,Kafka有個參數可以讓consumer阻塞知道新消息到達(當然也可以阻塞知道消息的數量達到某個特定的量這樣就可以批量發送)。

11、消費狀態跟蹤

對消費消息狀態的記錄也是很重要的。
大部分消息系統在broker端的維護消息被消費的記錄:一個消息被分發到consumer后broker就馬上進行標記或者等待customer的通知后進行標記。這樣也可以在消息在消費后立馬就刪除以減少空間占用。
但是這樣會不會有什么問題呢?如果一條消息發送出去之后就立即被標記為消費過的,一旦consumer處理消息時失敗了(比如程序崩潰)消息就丟失了。為了解決這個問題,很多消息系統提供了另外一個個功能:當消息被發送出去之后僅僅被標記為已發送狀態,當接到consumer已經消費成功的通知后才標記為已被消費的狀態。這雖然解決了消息丟失的問題,但產生了新問題,首先如果consumer處理消息成功了但是向broker發送響應時失敗了,這條消息將被消費兩次。第二個問題時,broker必須維護每條消息的狀態,並且每次都要先鎖住消息然后更改狀態然后釋放鎖。這樣麻煩又來了,且不說要維護大量的狀態數據,比如如果消息發送出去但沒有收到消費成功的通知,這條消息將一直處於被鎖定的狀態,
Kafka采用了不同的策略。Topic被分成了若干分區,每個分區在同一時間只被一個consumer消費。這意味着每個分區被消費的消息在日志中的位置僅僅是一個簡單的整數:offset。這樣就很容易標記每個分區消費狀態就很容易了,僅僅需要一個整數而已。這樣消費狀態的跟蹤就很簡單了。
這帶來了另外一個好處:consumer可以把offset調成一個較老的值,去重新消費老的消息。這對傳統的消息系統來說看起來有些不可思議,但確實是非常有用的,誰規定了一條消息只能被消費一次呢?consumer發現解析數據的程序有bug,在修改bug后再來解析一次消息,看起來是很合理的額呀!

12、離線處理消息

高級的數據持久化允許consumer每個隔一段時間批量的將數據加載到線下系統中比如Hadoop或者數據倉庫。這種情況下,Hadoop可以將加載任務分拆,拆成每個broker或每個topic或每個分區一個加載任務。Hadoop具有任務管理功能,當一個任務失敗了就可以重啟而不用擔心數據被重新加載,只要從上次加載的位置繼續加載消息就可以了。

13、副本與主從關系(本部分直接翻譯自官方文檔)

Kafka允許topic的分區擁有若干副本,這個數量是可以配置的,你可以為每個topci配置副本的數量。Kafka會自動在每個個副本上備份數據,所以當一個節點down掉時數據依然是可用的。

Kafka的副本功能不是必須的,你可以配置只有一個副本,這樣其實就相當於只有一份數據。

創建副本的單位是topic的分區,每個分區都有一個leader和零或多個followers.所有的讀寫操作都由leader處理,一般分區的數量都比broker的數量多的多,各分區的leader均勻的分布在brokers中。所有的followers都復制leader的日志,日志中的消息和順序都和leader中的一致。flowers向普通的consumer那樣從leader那里拉取消息並保存在自己的日志文件中。
許多分布式的消息系統自動的處理失敗的請求,它們對一個節點是否
着(alive)”有着清晰的定義。Kafka判斷一個節點是否活着有兩個條件:

節點必須可以維護和ZooKeeper的連接,Zookeeper通過心跳機制檢查每個節點的連接。
如果節點是個follower,他必須能及時的同步leader的寫操作,延時不能太久。
符合以上條件的節點准確的說應該是“同步中的(in sync)”,而不是模糊的說是“活着的”或是“失敗的”。Leader會追蹤所有“同步中”的節點,一旦一個down掉了,或是卡住了,或是延時太久,leader就會把它移除。至於延時多久算是“太久”,是由參數replica.lag.max.messages決定的,怎樣算是卡住了,怎是由參數replica.lag.time.max.ms決定的。
只有當消息被所有的副本加入到日志中時,才算是“committed”,只有committed的消息才會發送給consumer,這樣就不用擔心一旦leader down掉了消息會丟失。Producer也可以選擇是否等待消息被提交的通知,這個是由參數request.required.acks決定的。

Kafka保證只要有一個“同步中”的節點,“committed”的消息就不會丟失。

14、Leader的選擇

Kafka的核心是日志文件,日志文件在集群中的同步是分布式數據系統最基礎的要素。

如果leaders永遠不會down的話我們就不需要followers了!一旦leader down掉了,需要在followers中選擇一個新的leader.但是followers本身有可能延時太久或者crash,所以必須選擇高質量的follower作為leader.必須保證,一旦一個消息被提交了,但是leader down掉了,新選出的leader必須可以提供這條消息。大部分的分布式系統采用了多數投票法則選擇新的leader,對於多數投票法則,就是根據所有副本節點的狀況動態的選擇最適合的作為leader.Kafka並不是使用這種方法。

Kafaka動態維護了一個同步狀態的副本的集合(a set of in-sync replicas),簡稱ISR,在這個集合中的節點都是和leader保持高度一致的,任何一條消息必須被這個集合中的每個節點讀取並追加到日志中了,才回通知外部這個消息已經被提交了。因此這個集合中的任何一個節點隨時都可以被選為leader.ISR在ZooKeeper中維護。ISR中有f+1個節點,就可以允許在f個節點down掉的情況下不會丟失消息並正常提供服。ISR的成員是動態的,如果一個節點被淘汰了,當它重新達到“同步中”的狀態時,他可以重新加入ISR.這種leader的選擇方式是非常快速的,適合kafka的應用場景。

一個邪惡的想法:如果所有節點都down掉了怎么辦?Kafka對於數據不會丟失的保證,是基於至少一個節點是存活的,一旦所有節點都down了,這個就不能保證了。
實際應用中,當所有的副本都down掉時,必須及時作出反應。可以有以下兩種選擇:

等待ISR中的任何一個節點恢復並擔任leader。
選擇所有節點中(不只是ISR)第一個恢復的節點作為leader.
這是一個在可用性和連續性之間的權衡。如果等待ISR中的節點恢復,一旦ISR中的節點起不起來或者數據都是了,那集群就永遠恢復不了了。如果等待ISR意外的節點恢復,這個節點的數據就會被作為線上數據,有可能和真實的數據有所出入,因為有些數據它可能還沒同步到。Kafka目前選擇了第二種策略,在未來的版本中將使這個策略的選擇可配置,可以根據場景靈活的選擇。

這種窘境不只Kafka會遇到,幾乎所有的分布式數據系統都會遇到。

15、副本管理

以上僅僅以一個topic一個分區為例子進行了討論,但實際上一個Kafka將會管理成千上萬的topic分區.Kafka盡量的使所有分區均勻的分布到集群所有的節點上而不是集中在某些節點上,另外主從關系也盡量均衡這樣每個幾點都會擔任一定比例的分區的leader.

優化leader的選擇過程也是很重要的,它決定了系統發生故障時的空窗期有多久。Kafka選擇一個節點作為“controller”,當發現有節點down掉的時候它負責在游泳分區的所有節點中選擇新的leader,這使得Kafka可以批量的高效的管理所有分區節點的主從關系。如果controller down掉了,活着的節點中的一個會備切換為新的controller.

16、消息格式

(1)消息格式

消息由一個固定長度的頭部和可變長度的字節數組組成。頭部包含了一個版本號和CRC32校驗碼。


/**

* 具有N個字節的消息的格式如下

*

* 如果版本號是0

*

* 1. 1個字節的 "magic" 標記

*

* 2. 4個字節的CRC32校驗碼

*

* 3. N - 5個字節的具體信息

*

* 如果版本號是1

*

* 1. 1個字節的 "magic" 標記

*

* 2.1個字節的參數允許標注一些附加的信息比如是否壓縮了,解碼類型等

*

* 3.4個字節的CRC32校驗碼

*

* 4. N - 6 個字節的具體信息

*

*/

(2)日志

一個叫做“my_topic”且有兩個分區的的topic,它的日志有兩個文件夾組成,my_topic_0和my_topic_1,每個文件夾里放着具體的數據文件,每個數據文件都是一系列的日志實體,每個日志實體有一個4個字節的整數N標注消息的長度,后邊跟着N個字節的消息。每個消息都可以由一個64位的整數offset標注,offset標注了這條消息在發送到這個分區的消息流中的起始位置。每個日志文件的名稱都是這個文件第一條日志的offset.所以第一個日志文件的名字就是00000000000.kafka.所以每相鄰的兩個文件名字的差就是一個數字S,S差不多就是配置文件中指定的日志文件的最大容量。

消息的格式都由一個統一的接口維護,所以消息可以在producer,broker和consumer之間無縫的傳遞。存儲在硬盤上的消息格式如下所示:

消息長度: 4 bytes (value: 1+4+n)

版本號: 1 byte

CRC校驗碼: 4 bytes

具體的消息: n bytes

(3)寫操作

消息被不斷的追加到最后一個日志的末尾,當日志的大小達到一個指定的值時就會產生一個新的文件。對於寫操作有兩個參數,一個規定了消息的數量達到這個值時必須將數據刷新到硬盤上,另外一個規定了刷新到硬盤的時間間隔,這對數據的持久性是個保證,在系統崩潰的時候只會丟失一定數量的消息或者一個時間段的消息。

(4)讀操作

讀操作需要兩個參數:一個64位的offset和一個S字節的最大讀取量。S通常比單個消息的大小要大,但在一些個別消息比較大的情況下,S會小於單個消息的大小。這種情況下讀操作會不斷重試,每次重試都會將讀取量加倍,直到讀取到一個完整的消息。可以配置單個消息的最大值,這樣服務器就會拒絕大小超過這個值的消息。也可以給客戶端指定一個嘗試讀取的最大上限,避免為了讀到一個完整的消息而無限次的重試。

在實際執行讀取操縱時,首先需要定位數據所在的日志文件,然后根據offset計算出在這個日志中的offset(前面的的offset是整個分區的offset),然后在這個offset的位置進行讀取。定位操作是由二分查找法完成的,Kafka在內存中為每個文件維護了offset的范圍。

下面是發送給consumer的結果的格式:


MessageSetSend (fetch result)

total length     : 4 bytes

error code       : 2 bytes

message 1        : x bytes

...

message n        : x bytes

MultiMessageSetSend (multiFetch result)



total length       : 4 bytes

error code         : 2 bytes

messageSetSend 1

...

messageSetSend n

(5)刪除

日志管理器允許定制刪除策略。目前的策略是刪除修改時間在N天之前的日志(按時間刪除),也可以使用另外一個策略:保留最后的N GB數據的策略(按大小刪除)。為了避免在刪除時阻塞讀操作,采用了copy-on-write形式的實現,刪除操作進行時,讀取操作的二分查找功能實際是在一個靜態的快照副本上進行的,這類似於Java的CopyOnWriteArrayList。

(6)可靠性保證

日志文件有一個可配置的參數M,緩存超過這個數量的消息將被強行刷新到硬盤。一個日志矯正線程將循環檢查最新的日志文件中的消息確認每個消息都是合法的。合法的標准為:所有文件的大小的和最大的offset小於日志文件的大小,並且消息的CRC32校驗碼與存儲在消息實體中的校驗碼一致。如果在某個offset發現不合法的消息,從這個offset到下一個合法的offset之間的內容將被移除。

有兩種情況必須考慮:1,當發生崩潰時有些數據塊未能寫入。2,寫入了一些空白數據塊。第二種情況的原因是,對於每個文件,操作系統都有一個inode(inode是指在許多“類Unix文件系統”中的一種數據結構。每個inode保存了文件系統中的一個文件系統對象,包括文件、目錄、大小、設備文件、socket、管道, 等等),但無法保證更新inode和寫入數據的順序,當inode保存的大小信息被更新了,但寫入數據時發生了崩潰,就產生了空白數據塊。CRC校驗碼可以檢查這些塊並移除,當然因為崩潰而未寫入的數據塊也就丟失了

二、配置文件

(一)java調優

特別說明一下JVM配置 在bin/kafka-server-start.sh中添加以下內容:

export KAFKA_HEAP_OPTS="-Xmx4G -Xms4G"

官方的推薦使用G1GC,但感覺還不穩定,還是先用CMS算了。以下為官方推薦內容

-Xms4g -Xmx4g -XX:PermSize=48m -XX:MaxPermSize=48m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35

For reference, here are the stats on one of LinkedIn's busiest clusters (at peak): - 15 brokers - 15.5k partitions (replication factor 2) - 400k messages/sec in - 70 MB/sec inbound, 400 MB/sec+ outbound The tuning looks fairly aggressive, but all of the brokers in that cluster have a 90% GC pause time of about 21ms, and they're doing less than 1 young GC per second.

(二)參數說明

kafka中有很多的配置參數,大致可以分為以下4類:

 Broker Configs
 Consumer Configs
 Producer Configs
 New Producer Configs

以下僅對部分重要參數說明並不斷完善,全部的參數說明請參考http://kafka.apache.org/documentation.html#consumerconfigs

broker中的配置只有3個參數是必須提供的:broker.id,log,dir, zookeeper.connect.

1、broker.id=0 用於區分broker,確保每台機器不同,要求是正數。當該服務器的IP地址發生改變時,broker.id沒有變化,則不會影響consumers的消息情況

2、log.dirs=/home/data/kafka kafka用於放置消息的目錄,默認為/tmp/kafka-logs。它可以是以逗號分隔的多個目錄,創建新分區時,默認會選擇存在最少分區的目錄。

3、zookeeper.connect=192.168.169.91:2181,192.168.169.92:2181,192.168.169.93:2181/kafka zk用於放置kafka信息的地方。注意一般情況下,直接使用192.168.169.91:2181,192.168.169.92:2181,192.168.169.93:2181即可,此時kafka的相關信息會放在zk的根目錄下,但如果這個zk集群同時為多個kafka集群,或者其它集群服務,則信息會很混亂,甚至有沖突。因此一般會建一個目錄用於放置kafka集群信息的目錄,此處的目錄為/kafka。注意,這個目錄必須手工創建,kafka不會自動創建這個目錄。此外,在conusmer中也必須使用192.168.169.91:2181,192.168.169.92:2181,192.168.169.93:2181/kafka來讀取topic內容。

4、num.partitions=1 創建topic時,默認的分區數

5、num.network.threads=10 broker用於處理網絡請求的線程數,如不配置默認為3

6、zookeeper.connection.timeout.ms=6000

7、message.max.bytes=1000000000

replica.fetch.max.bytes=1073741824

一條消息的最大字節數,說明如下:

kafka中出現以下異常:

[2015-06-09 17:03:05,094] ERROR [KafkaApi-0] Error processing ProducerRequest with correlation id 616 from client kafka-client on partition [test3,0] (kafka.server.KafkaApis)
kafka.common.MessageSizeTooLargeException: Message size is 2211366 bytes which exceeds the maximum configured message size of 1000012.

原因是集群默認每次只能接受約1M的消息,如果客戶端一次發送的消息大於這個數值則會導致異常。
在server.properties中添加以下參數

message.max.bytes=1000000000
replica.fetch.max.bytes=1073741824

同時在consumer.properties中添加以下參數:

fetch.message.max.bytes=1073741824

然后重啟kafka進程即可,現在每次最大可接收100M的消息。

8、delete.topic.enable=true 默認為false,即delete topic時只是marked for deletion,但並不會真正刪除topic。

9、關於日志的保存時間或量:
(1)log.retention.hours=24 消息被刪除前保存多少小時,默認1周168小時
(2)log.retention.bytes 默認為-1,即不限制大小。注意此外的大小是指一個topic的一個分區的最大字節數。
當超出上述2個限制的任何一個時,日志均會被刪除。

也可以在topic級別定義這個參數:

retention.bytes=3298534883328   #3T
retention.bytes與retention.ms

10、同步發送還是異步發送,異步吞吐量較大,但可能引入錯誤,默認為sync
producer.type=sync|async
This parameter specifies whether the messages are sent asynchronously in a background thread. Valid values are (1) async for asynchronous send and (2) sync for synchronous send. By setting the producer to async we allow batching together of requests (which is great for throughput) but open the possibility of a failure of the client machine dropping unsent data.

11、batch.size 默認值為16384
在async模式下,producer緩存多少個消息后再一起發送

12、compression.type 默認值為none,可選gzip snappy
The compression type for all data generated by the producer. The default is none (i.e. no compression). Valid values are none, gzip, or snappy. Compression is of full batches of data, so the efficacy of batching will also impact the compression ratio (more batching means better compression).

13、default.replication.factor 消息副本的數量,默認為1,即沒有副本

還有一些需要關注的配置項:
Replication configurations
用於follower從leader復制消息的線程數,默認為1
num.replica.fetchers=4
follower每次從leader復制消息的字節數,默認為1M,即1024*1024
replica.fetch.max.bytes=1048576
當follow向leader發送數據請求后,最大的等待時長,默認為500ms replica.fetch.wait.max.ms=500
每隔多久,follower會將其復制的highwater寫到磁盤中,以便出錯時恢復。 replica.high.watermark.checkpoint.interval.ms=5000
follower與leader之間的time out時長,默認為30秒 replica.socket.timeout.ms=30000
socket每次的buffer字節數 replica.socket.receive.buffer.bytes=65536
如果一個follower在這段時長內都沒有向leader發出復制請求,則leader會認為其已經down掉,並從ISR中去掉。
replica.lag.time.max.ms=10000
如果一個follower比leader落后超過這個數據的消息數,則leader會將其從isr中去掉。 replica.lag.max.messages=4000 partition management controller 與replica之間的超時時長 controller.socket.timeout.ms=30000
The buffer size for controller-to-broker-channels
controller.message.queue.size=10
Log configuration
如果在創建topic時沒有指定分區大小,默認的分區大小如下 num.partitions=8
kafka集群可以接收的最大消息字節數,默認為1M.注意,如果增大了這個數值,在consumer中也必須增大這個數值,否則consumer將無法消費這個消息。
message.max.bytes=1000000
當向一個不存在的topic發送消息時,是否允許自動創建topic auto.create.topics.enable=true
kafka保存多久的數據,單位是小時
log.retention.hours=72
The number of messages written to a log partition before we force an fsync on the log. Setting this lower will sync data to disk more
often but will have a major impact on performance. We generally recommend that people make use of replication for durability rather
than depending on single-server fsync, however this setting can be used to be extra certain.下面2個值默認都是Long.MaxValue。
log.flush.interval.ms=10000 log.flush.interval.messages=20000 log.flush.scheduler.interval.ms=2000 log.roll.hours=168 log.retention.check.interval.ms=300000 log.segment.bytes=1073741824 # ZK configuration zookeeper.connection.timeout.ms=6000 zookeeper.sync.time.ms=2000 # Socket server configuration
執行請求的線程數,至少與你的磁盤數量相同。 num.io.threads=8
服務器用於處理網絡請求的線程數,一般不需要更改,默認為3. num.network.threads=8
服務器允許最大的請求大小。它可以預防out of memory,而且應該小於java 堆大小。 socket.request.max.bytes=104857600 socket.receive.buffer.bytes=1048576 socket.send.buffer.bytes=1048576 queued.max.requests=16 fetch.purgatory.purge.interval.requests=100 producer.purgatory.purge.interval.requests=100

三、錯誤處理

1、配置kafka時,如果使用zookeeper create /kafka創建了節點,kafka與storm集成時new ZkHosts(zks) 需要改成 new ZkHosts(zks,”/kafka/brokers”),不然會報

java.lang.RuntimeException: java.lang.RuntimeException: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /brokers/topics/my-replicated-topic5/partitions。

storm-kafka插件默認kafka的 zk_path如下:

public class ZkHosts implements BrokerHosts {
private static final String DEFAULT_ZK_PATH = “/brokers”;

2、如果出現以下問題,代表偏移量出錯,建議重新開一個topic

ERROR [KafkaApi-3] Error when processing fetch request for partition [xxxxx,1] offset 112394 from consumer with correlation id 0 (kafka.server.KafkaApis)
kafka.common.OffsetOutOfRangeException: Request for offset 112394 but we only have log segments in the range 0 to 665.  

3、當沒有某個topic,或者是某個topic的node放置不在默認位置時,會有以下異常:

java.lang.RuntimeException: java.lang.RuntimeException: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /kafka/brokers/topics/mytest/partitions at storm.kafka.Dynam         

4、kafka中出現以下異常:
[2015-06-09 17:03:05,094] ERROR [KafkaApi-0] Error processing ProducerRequest with correlation id 616 from client kafka-client on partition [test3,0] (kafka.server.KafkaApis)
kafka.common.MessageSizeTooLargeException: Message size is 2211366 bytes which exceeds the maximum configured message size of 1000012.
原因是集群默認每次只能接受約1M的消息,如果客戶端一次發送的消息大於這個數值則會導致異常。
在server.properties中添加以下參數

message.max.bytes=1000000000
replica.fetch.max.bytes=1073741824

同時在consumer.properties中添加以下參數:

fetch.message.max.bytes=1073741824
``
然后重啟kafka進程即可,現在每次最大可接收100M的消息。
 
5、open too many files
kafka出現異常,日志提示open too many file
查找文件打開數量
lsof -p 30353 | wc
如果在1000以上,一般都是不正常,走過65535就會出錯。
原因打開了太多producer,沒關閉,調用producer.close()即可。
 

#四、zookeeper中的內容
默認情況,kafka在zk的/brokers目錄下記錄topic相關的信息,但如果在創建topic時,指定了路徑,則放置到固定的路徑中,如:

bin/kafka-topics.sh --create --zookeeper 192.168.169.91:2181,192.168.169.92:2181,192.168.169.93:2181/kafka --replication-factor 3 --partitions 5 --topic test_topic

創建的topic,其相關信息會放置到/kafka/brokers中,這個目錄中主要包括2個子目錄:ids 和 topics
1、ids:記錄這個kafka集群中有多少個broker
如:
 
ls /kafka/brokers/ids/
3   2   5   4
 
這個集群有4個節點,節點id分別為2,3,4,5。 我們看一下內容

[zk: localhost:2181(CONNECTED) 27] get /kafka/brokers/ids/2
{"jmx_port":-1,"timestamp":"1435833841290","host":"kafka02-log.i.nease.net","version":1,"port":9092}
cZxid = 0x1000e8a68
ctime = Thu Jul 02 18:44:01 HKT 2015
mZxid = 0x1000e8a68
mtime = Thu Jul 02 18:44:01 HKT 2015
pZxid = 0x1000e8a68
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x44e440d0bdf06eb
dataLength = 104
numChildren = 0

記錄着這個節點的一些基本情況。

 
2、topics
先看一下有哪些內容:

[zk: localhost:2181(CONNECTED) 29] ls /kafka/brokers/topics/test30/partitions
[3, 2, 1, 0, 4]
[zk: localhost:2181(CONNECTED) 30] ls /kafka/brokers/topics/test30/partitions/0
[state]
[zk: localhost:2181(CONNECTED) 1] get /kafka/brokers/topics/test30/partitions/0/state
{"controller_epoch":4,"leader":5,"version":1,"leader_epoch":2,"isr":[5]}
cZxid = 0x100017c5e
ctime = Wed Jul 01 14:54:24 HKT 2015
mZxid = 0x1000e8a84
mtime = Thu Jul 02 18:44:01 HKT 2015
pZxid = 0x100017c5e
cversion = 0
dataVersion = 2
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 72
numChildren = 0

可以看某個分區的leader是哪個,從而讀取kafka消息時,可以從這個leader中讀取數據。


以下內容來自官方文檔:

下面給出了zk中用於保存consumber與brokers相關信息的目錄結構與算法介紹。

關於目錄結構的前提說明:默認情況下,kafka相關的信息放在zk根目錄下的某個路徑中,但也可以設置為單獨的路徑,設置方法見配置選項部分。在我們的集群中,我們建立了一個目錄/kafka作為所有kafka相關信息的保存位置。因此我們在這里所列的/kafka/xyz,對於默認情況應該是/xyz。

broker節點的注冊

[zk: localhost:2181(CONNECTED) 140] get /kafka/brokers/ids/2
{"jmx_port":-1,"timestamp":"1437460315901","host":"gdc-kafka02-log.i.nease.net","version":1,"port":9092}

在zk中,有一個broker節點的列表,列表中的每一項表示一個邏輯broker。在啟動時,broker節點會在zk中的/kafka/broker/ids/目錄下創建一個znode,名稱為配置文件中定義的broker id,如上面所示的/kafka/brokers/ids/2。建立邏輯broker id的目的是允許一個broker節點遷移到另一台機器上,而不會影響到consumer的消費。如果想注冊一個已經存在的broker id會引起錯誤(比如說有2個broker的配置文件都寫了同一個broker id)。

由於broker在zk中注冊的是一個ephemeral znodes,因此當這個broker關機或者掛掉的時候,這個注冊信息會自動刪除,從而會通知consumer這個節點已經不可用。

Topic注冊

ls /kafka/brokers/topics/testtopic/partitions/

3 2 1 0 4

get /kafka/brokers/topics/testtopic/partitions/0/state

{"controller_epoch":9,"leader":5,"version":1,"leader_epoch":26,"isr":[5]}

每個topic都會在zk中注冊,如上面的testtopic有5個分區。

consumer與consumer組
為了彼此協調以及平衡數據的消費,consumer也會在zk中注冊信息。通過設置offsets.storage=zookeeper,可以將consumer的offset保存在zk中,不過這種做法會被逐步淘汰。現在推薦使用kafka作為offset的保存。

一個組內的consumer可以共同消費一個topic,它們擁有同一個group_id。組內的consumer會盡可能公平的將topic的分區切分。

consumer id注冊
每一個consumer都會在zk注冊信息,如:

get /kafka/consumers/console-consumer-30094/ids/console-consumer-30094_gdc-kafka03-log.i.nease.net-1437029151314-d7cdc855
{"version":1,"subscription":{"streaming_ma30_sdc":1},"pattern":"white_list","timestamp":"1437459282749"}

 

consumer offset
conusumer會根據它已經消費的最大的offset,默念會存儲在zk的目錄下(也可以設置為kafka)。

get /kafka/consumers/testtopic/offsets/testtopic/0
1413950858

注意這是一個永久節點,因此當consumer掛掉重啟時可以繼續讀取。

分區owner注冊
每一個broker分區會官能一個consumer組里的一個consumer消費,這個consumer必須建立它對這個分區的占有(ownership),再開始消費。為了建立這個占有關系,consumer會在zk中建立相關的信息。

/kafka/consumers/[group_id]/owners/[topic]/[broker_id-partition_id] --> consumer_node_id (ephemeral node)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM