Kafka官方文檔翻譯——設計


下面是博主的公眾號,后續會發布和討論一系列分布式消息隊列相關的內容,歡迎關注。

---------------------------------------------------------------------------------------------------------

Design

1. Motivation

我們設計Kafka用來作為統一的平台來處理大公司可能擁有的所有實時數據源。為了做到這點,我們必須思考大量的使用場景。

它必須有高吞吐去支持大數據流,例如實時日志聚合。

它必須優雅的處理數據積壓,以支持定期從離線系統加載數據。

這也以為這系統必須支持低延遲的分發來處理傳統消息系統的場景。

我們想支持分區的、分布式的、實時的處理數據源並創建新的數據源,這推動了我們的分區和消費模型。

最后,將流反饋到其他系統進行服務的情況下,我們知道系統必須能夠保證容錯性,在部分機器故障的時候提供服務。

支持這些使用推動我們做了一些列特殊的元素設計,比起傳統的消息系統更像是數據庫日志。我們將在以下章節介紹一些設計要素。

2. Persistence

Don't fear the filesystem!

Kafka強依賴文件系統來存儲和緩存消息。“磁盤是緩慢的”是一個通常的認知,這是人們懷疑持久化的結構能否提供強大的性能。事實上,磁盤比人們想象的更慢也更快,這基於如何去使用它們;一個合適的設計可以使磁盤和網絡一樣的快速。

影響磁盤性能的核心因素是磁盤驅動的吞吐和過去十年磁盤的查找方式不同了。使用六個7200rpm的SATA RAID-5陣列的JBOD配置線性寫入能力為600MB/sec,而隨機寫的性能僅僅是100k/sec,相差了6000倍。線性寫入和讀取是最可預測的,並且被操作系統大量的優化。現代操作系統提供read-ahead和write-behind技術,他們大塊預讀數據,並將較小的羅機械合並成較大的物理寫入。在ACM Queue的文章中可以找到此問題相關的進一步討論;他們實際上發現順序訪問磁盤在某些情況下比隨機訪問內存還快。

為了彌補性能的差異,現代操作系統在使用主內存來做磁盤緩存時變的越來越激進。當內存被回收時,現代操作系統將樂意將所有可用內存轉移到磁盤緩存,而且性能會降低很多。所有的磁盤讀寫都需要通過這層緩存。這個功能不會被輕易關閉,除非使用Direct IO,因此盡管在進程內緩存了數據,這些數據也有可能在操作系統的pagecache中緩存,從而被緩存了兩次。

此外,我們建立在JVM之上,任何在Java內存上花費過時間的人都知道兩件事情:

對象的內存開銷非常大,通常將存儲的數據大小翻倍(或更多)。

Java的內存回收隨着堆內數據的增多變的越來越緩慢。

由於這些因素,使用文件系統並依賴於pagecache要優於維護內存中緩存或其他結構——我們至少可以通過直接訪問內存來是可用內存增加一倍,並通過存儲字節碼而不是對象的方式來節約更多的內存。這樣做將可以在32G的機器上使用28-30GB的內存,而不需要承受GC的問題。此外,及時重啟服務,內存會保持有效,而進程內緩存將需要重建(對於10G的數據可能需要10分鍾),否則需要從冷數據加載(可怕的初始化性能)。這也大大簡化了代碼,因為保持緩存和文件之間的一致性是由操作系統負責的,這比進程中操作更不容易出錯。

這是一個簡單的設計:在進程內盡量緩沖數據,空間不足時將所有數據刷寫到磁盤,我們采用了相反的方式。數據並盡快寫入一個持久化的日志而不需要立即刷到磁盤。實際上這只是意味着數據被轉移到了內核的pagecache。

(以pagecache為中心的設計風格)

Constant Time Suffices

在消息系統中使用持久化數據通常是具有關聯的BTree或其他隨機訪問的數據結構,以維護消息的元數據。BTree是最通用的數據結構,可以在消息系統中支持各種各樣的語義。BTree的操作時間復雜度是O(log N)。通常O(log N)被認為是固定時間的,但是在磁盤操作中卻不是。每個磁盤一次只能執行一個seek,所以並行度受到限制。因此即使少量的磁盤搜索也會導致非常高的開銷。由於操作系統將快速的緩存操作和非常慢的磁盤操作相結合,所以觀察到樹結構的操作通常是超線性的,因為數據隨固定緩存增加。

直觀的,持久化隊列可以像日志的解決方案一樣,簡單的讀取和追加數據到文件的結尾。這個結構的優勢是所有的操作都是O(1)的,並且讀取是可以並行不會阻塞的。這具有明顯的性能優勢,因為性能與數據大小完全分離,可以使用低速的TB級SATA驅動器。雖然這些驅動器的搜索性能不佳,但是對於大量讀寫而言,他們的性能是可以接受的,並且價格是三分之一容量是原來的三倍。

無需任何的性能代價就可以訪問幾乎無限的磁盤空間,這意味着我們可以提供一些在消息系統中非尋常的功能。例如,在Kafka中,我們可以將消息保留較長的時間(如一周),而不是在消費后就盡快刪除。這位消費者帶來了很大的靈活性。

3. Efficiency

我們在效率上付出了很大的努力。主要的用例是處理web的數據,這個數據量非常大:每個頁面可能會生成十幾個寫入。此外我們假設每個發布的消息至少被一個Consumer消費,因此我們盡可能使消費的開銷小一些。

從構建和運行一些類似的系統的經驗發現,效率是多租戶操作的關鍵。如果下游基礎服務成為瓶頸,那么應用程序的抖動將會引起問題。我們確保應用程序不會引起基礎服務的Load問題,這個非常重要的,當一個集群服務上百個應用程序的時候,因為應用的使用模式的變化時非常頻繁的。

我們在之前的章節中討論過磁盤的效率。一旦不良的磁盤訪問模式被消除,這種類型的系統有兩個低效的原因:太多太小的IO操作和過多的數據拷貝。

太小的IO操作問題存在於客戶端和服務端之間,也存在於服務端自身的持久化當中。

為了避免這個問題,我們的協議圍繞“message set”抽象,通常是將消息聚合到一起。這允許網絡請求將消息聚合到一起,並分攤網絡往返的開銷,而不是一次發送單個消息。服務端依次將大塊消息追加到日志中,消費者一次線性獲取一批數據。

這種簡單的優化產生了一個數量級的加速。分批帶來了更大的網絡包,連續的磁盤操作,連續的內存塊等等,這些都使得Kafka將隨機消息寫入轉化為線性的寫入並流向Consumer。

其他低效的地方是字符復制。在消息少時不是問題,但是對負載的影響是顯而易見的。為了避免這種情況,我們采用被producer、broker、Consumer共享的標准的二進制消息格式(所以數據可以在傳輸時不需要進行修改)。

由Broker維護的消息日志本身只是一批文件,每個文件由一系列以相同格式寫入的消息構成。保持相同的格式保證了最重要的優化:網絡傳輸和持久化日志塊。現在UNIX操作系統提供了高度優化的代碼路徑用於將pagecache的數據傳輸到網絡;在Linux中,這有sendfile實現。

要劉姐sendfile的影響,了解從文件到網絡傳輸數據的data path非常重要:

  1. 操作系統從磁盤讀取文件數據到pagecache,在內核空間
  2. 用戶從內核空間將數據讀到用戶空間的buffer
  3. 操作系統重新將用戶buffer數據讀取到內核空間寫入到socket中
  4. 操作系統拷貝socket buffer數據到NIC buffer並發送到網絡

這顯然是低效的,有四個副本和兩個系統調用。使用sendfile,允許操作系統直接將數據從pagecache寫入到網絡,而避免不必要的拷貝。在這個過程中,只有最終將數據拷貝到NIC buffer是必要的。

我們期望一個共同的場景是多個Consumer消費一個Topic數據,使用zero-copy優化,數據被拷貝到pagecache並且被多次使用,而不是每次讀取的時候拷貝到內存。這允許以接近網絡連接的速度消費消息。

pagecache和sendfile的組合意味着在消費者追上寫入的情況下,將看不到磁盤上的任何讀取活動,因為他們都將從緩存讀取數據。

sendfile和更多的zero-copy背景知識見zero-copy

End-to-end Batch Compression

在一些場景下,CPU核磁盤並不是性能瓶頸,而是網絡帶寬。在數據中心和廣域網上傳輸數據尤其如此。當然,用戶可以壓縮它的消息而不需要Kafka的支持,但是這可能導致非常差的壓縮比,因為冗余的大部分是由於相同類型的消息之間的重復(例如JSON的字段名)。多個消息進行壓縮比單獨壓縮每條消息效率更高。

Kafka通過允許遞歸消息來支持這一點。一批消息可以一起壓縮並以此方式發送到服務端。這批消息將以壓縮的形式被寫入日志,只能在消費端解壓縮。

Kafka支持GZIP,Snappy和LZ4壓縮協議。更多的壓縮相關的細節在這里

4. The Producer

Load balancing

Producer直接向Leader Partition所在的Broker發送數據而不需要經過任何路由的干預。為了支持Producer直接向Leader Partition寫數據,所有的Kafka服務節點都支持Topic Metadata的請求,返回哪些Server節點存活的、Partition的Leader節點的分布情況。

由客戶端控制將數據寫到哪個Partition。這可以通過隨機或者一些負載均衡的策略來實現(即客戶端去實現Partition的選擇策略)。Kafka暴露了一個接口用於用戶去指定一個Key,通過Key hash到一個具體的Partition。例如,如果Key是User id,那么同一個User的數據將被發送到同一個分區。這樣就允許消費者在消費時能夠對消費的數據做一些特定的處理。這樣的設計被用於處理“局部敏感”的數據(結合上面的場景,Partition內的數據是可以保持順序消費的,那么同一個用戶的數據在一個分區,那么就可以保證對任何一個用戶的處理都是順序的)。

Asynchronous send

批處理是提升效率的主要方式一致,為了支持批處理,Kafka允許Producer在內存聚合數據並在一個請求中發出。批處理的大小可以是通過消息數量指定的,也可以是通過等待的時間決定的(例如64K或者10ms)。這樣允許聚合更多的數據后發送,減少了IO操作。緩沖的數據大小是可以配置了,這樣能適當增加延遲來提升吞吐。

更多的細節可以在Producer的配合和API文檔中找到。

5 The Consumer

Kafka Consumer通過給Leader Partition所在的Broker發送“fetch”請求來進行消費。Consumer在請求中指定Offset,並獲取從指定的Offset開始的一段數據。因此Consumer對消費的位置有絕對的控制權,通過重新設置Offset就可以重新消費數據。

Push vs Pull

我們考慮的一個初步問題是Consumer應該從Broker拉取數據還是Broker將數據推送給Consumer。在這方面,Kafka和大多數消息系統一樣,采用傳統的設計方式,由Producer想Broker推送數據,Consumer從Broker上拉取數據。一些日志中心系統,如Scribe和Apache Flume,遵循數據向下游推送的方式。兩種方式各有利弊。基於推送的方式,由於是由Broker控制速率,不能很好對不同的Consumer做處理。Consumer的目標通常是以最大的速率消費消息,不幸的是,在一個基於推送的系統中,當Consumer消費速度跟不上生產速度 時,推送的方式將使Consumer“過載”。基於拉取的系統在這方面做的更好,Consumer只是消費落后並在允許時可以追上進度。消費者通過某種協議來緩解這種情況,消費者可以通過這種方式來表明它的負載,這讓消費者獲得充分的利用但不會“過載”。以上原因最終使我們使用更為傳統的Pull的方式。

Pull模型的另一個優勢是可以聚合數據批量發送給Consumer。Push模型必須考慮是立即推送數據給Consumer還是等待聚合一批數據之后發送。如果調整為低延遲,這將導致每次只發送一條消息(增加了網絡交互)。基於Pull的模式,Consumer每次都會盡可能多的獲取消息(受限於可消費的消息數和配置的每一批數據最大的消息數),所以可以優化批處理而不增加不必要的延遲。

基於Pull模式的一個缺陷是如果Broker沒有數據,Consumer可能需要busy-waiting的輪訓方式來保證高效的數據獲取(在數據到達后快速的響應)。為了避免這種情況,我們在Pull請求中可以通過參數配置“long poll”的等待時間,可以在服務端等待數據的到達(可選的等待數據量的大小以保證每次傳輸的數據量,減少網絡交互)。

你可以想象其他一些從端到端,采用Pull的可能的設計。Producer把數據寫到本地日志,Broker拉取這些Consumer需要的數據。一個相似的被稱為“store-and-forward”的Producer經常被提及。這是有趣的,但是我們覺得不太適合我們可能會有成千上萬個Producer的目標場景。我們維護持久化數據系統的經驗告訴我們,在系統中使多應用涉及到上千塊磁盤將會使事情變得不可靠並且會使操作它們變成噩夢。最后再實踐中,我們發現可以大規模的運行強大的SLAs通道,而不需要生產者持久化。

Consumer Position

記錄哪些消息被消費過是消息系統的關鍵性能點。

大多數消息系統在Broker上保存哪些消息已經被消費的元數據。也就是說,Broker可以在消費傳遞給Consumer后立即記錄或等待消費者確認之后記錄。這是一個直觀的選擇,並且對於單個服務器而言並沒有更好的方式可以存儲這個狀態。大多數消息系統中的存儲設備並不能很好的伸縮,所以這也是務實的選擇——當Broker確認消息被消費后就立即刪除,以保證存儲較少的數據。

讓Broker和Consumer關於那些消息已經被消費了達成一致並不是一個簡單的問題。如果Broker在將消息寫到網絡之后就立即認為消息已經被消費,那么如果Consumer消費失敗(Consumer在消費消息之前Crash或者網絡問題等)消息將丟失。為了解決這個問題,一些消息系統增加了ACK機制,消息被標記為只是發送出去而不是已經被消費,Broker需要等待Consumer發送的ACK請求之后標記具體哪些消息已經被消費了。這個策略修復了消息丟失的問題,但是引起了新的問題。第一,如果Consumer處理了消息,但是在發送ACK給Broker之前出現問題,那么消息會被重復消息。第二,Broker需要維護每一條消息的多個狀態(是否被發送、是否被消費)。棘手的問題是要處理被發送出去但是沒有被ACK的消息。

Kafka采用不同的方式處理。Topic被划分為多個內部有序的分區,每個分區任何時刻只會被一個group內的一個Consumer消費。這意味着一個Partition的Position信息只是一個數字,標識下一條要消費的消息的偏移量。這使得哪些消息已經被消費的狀態變成了一個簡單的數據。這個位置可以定期做CheckPoint。這使得消息的ACK的代價非常小。

這個方案還有其他的好處。消費者可以優雅的指定一個舊的偏移量並重新消費這些數據。這和通常的消息系統的觀念相違背,但對很多消費者來說是一個很重要的特性。比如,如果Consumer程序存在BUG,在發現並修復后,可以通過重新消費來保證數據都正確的處理。

Offline Data Load

可擴展的持久化存儲的能力,是消費者可以定期的將數據導入到像Hadoop這樣的離線系統或關系型數據倉庫中。

在Hadoop的場景中,我們通過把數據分發到獨立的任務中進行並行處理,按照node/topic/partition組合,充分使用另行能力加載數據。Hadoop提供任務管理,失敗的任務可以重新啟動,而不需要擔心重復數據的危險——任務會從原始位置重新啟動。

6. Message Delivery Semantics

現在我們對Producer和Consumer已經有了一定的了解,接着我們來討論Kafka在Producer和Consumer上提供的語義。顯然的,在分發消息時是可以有多種語義的:

  • At most once:消息可能丟失,但不會重復投遞
  • At least once:消息不會丟失,但可能會重復投遞
  • Exactly once:消息不丟失、不重復,會且只會被分發一次(真正想要的)

值得注意的是這分為兩個問題:發布消息的可用性和消費消息的可用性。

許多系統都聲稱提供“exactly once”語義,仔細閱讀會發現,這些聲明是誤導的(他們沒有考慮Producer和Consumer可能Crash的場景,或是數據寫入磁盤后丟失的情況)。

Kafka提供的語義是直接了當的。發送消息的時候我們有一個消息被Commit到Log的概念。一旦消息已經被Commit,它將不會丟失,只要還有一個復制了消息所在Partition的Broker存活着。“存活”的定義以及我們覆蓋的失敗的情況將在下一節描述。現在假設一個完美的Broker,並且不會丟失,來理解對Producer和Consumer提供的語義保證。如果Producer發送一條消息,並且發生了網絡錯誤,我們是不能確認錯誤發生在消息Commit之前還是消息Commit之后的。類似於使用自增主鍵插入數據庫,是不能確認寫入之后的主鍵值的。

Producer沒有使用的強制可能的語義。我們無法確認網絡是否會發生異常,可以使Producer創建有序的主鍵使重試發送成為冪等的行為。這個特性對一個復制系統來說不是無價值的,因為服務器在發生故障的情況下依舊需要提供服務。使用這個功能,Producer可以重試,直到收到消息成功commit的響應,在這個點上保證消息發送的exactly once。我們希望把這個特性加到后續的Kafka版本中。

不是所有的場景都需要這樣的保證。對應延遲敏感的場景,我們允許Producer指定其期望的可用性級別。如果Producer期望等待消息Commit,那么這可能消耗10ms。Producer也可以指定以異步的方式發送消息或只等Leader節點寫入消息(不能Follower)。

接着我們從消費者的視角來描述語義。所有的副本都擁有偏移量相同的日志。Consumer控制它在日志中的偏移量。如果Consumer一直正常運行,它可以只把偏移量存儲在內存中,但是如果Consumer crash且我們期望另一個新的Consumer接管消費,那么需要選擇一個位置來開始消費。假設Consumer讀取了一些消息——它有集中處理消息和位置的方式。

它可以讀取消息,然后保存位置信息,然后處理消息。在這個場景中,Consumer可能在保存位置信息后消費消息失敗,那么下一次消費可能從保存的位點開始,盡管之前部分消息被處理失敗。這是消費處理過程中失敗的at-most-once(只被處理了一次,但是可能處理失敗)。

它可以讀取消息,之后處理消息,最后保存位置信息。這個場景中,Consumer可能在處理完消息,但是保存位點之前Crash,那么下一次會重新消費這些消息,盡管已經被消費過。這是Consumer Crash引起的at-least-once(消息可能會被處理多次)。

在很多場景沖,消息可以有一個主鍵,這樣可以保證處理的冪等性(多次處理不會有影響)。

那么什么是exactly once語義?這里的限制實際上不是消息系統的特性,而是消息處理和位置信息的保存。經典的解決方案是采用兩階段提交的方式來處理。但是這也可以用一個更簡單的方式來處理:通過將消息處理結果和位置信息保存在同一位置上。這是更好的,因為很多Consumer期望寫入的系統並不支持兩階段提交。例如, 我們的hadoop ETL工具從保存數據到hdfs上的同時也把位移位置也保存到hdfs中了, 這樣可以保證數據和位移位置同時被更新或者都沒更新.我們在很多系統上使用類似的模式, 用於解決那些需要這種強語義但是卻沒有主鍵用於區分重復的儲存系統中.

默認Kafka提供at-least-once語義的消息分發,允許用戶通過在處理消息之前保存位置信息的方式來提供at-most-once語義。exactly-once語義需要和輸出系統像結合,Kafka提供的offset可以使這個實現變的“直接了當的”(變得比較簡單)。

7. Replication

Kafka為Topic的每個Partition日志進行備份,備份數量可以由用戶進行配置。這保證了系統的自動容錯,如果有服務器宕機,消息可以從剩余的服務器中讀取。

其他消息系統提供了備份相關的功能,但在我們看來,這是一個附加的功能,不能被大量使用,並且伴隨着大量的缺點:Slave是不活躍的(這里應該是指Slave只提供了備份,並不可以被消費等等)、吞吐受到很大的影響、需要手動配置等等。在Kafka中,我們默認就提供備份,實際上我們認為沒有備份的Topic是一種特殊的備份,只是備份數為1。

備份的單位是Topic的分區。在沒有發生異常的情況下,Kafka中每個分區都會有一個Leader和0或多個Follower。備份包含Leader在內(也就是說如果備份數為3,那么有一個Leader Partition和兩個Follower Partition)。所有的讀寫請求都落在Leader Partition上。通常情況下分區要比Broker多,Leader分區分布在Broker上。Follower上的日志和Leader上的日志相同,擁有相同的偏移量和消息順序(當然,在特定時間內,Leader上日志會有一部分數據還沒復制到Follower上)。

Follower作為普通的Consumer消費Leader上的日志,並應用到自己的日志中。Leader允許Follower自然的,成批的從服務端獲取日志並應用到自己的日志中。

大部分分布式系統都需要自動處理故障,需要對節點“alive”進行精確的定義。例如在Kafka中,節點存活需要滿足兩個條件:

  1. 節點需要保持它和ZooKeeper之間的Session(通過ZK的心跳機制)
  2. 如果是Follower,需要復制Leader上的寫事件,並且復制進度沒有“落后太多”

我們稱滿足這兩個條件的節點為“同步的”來避免使用“alive”或“failed”這樣模糊的概念。Leader節點保存同步中的Follower節點。如果一個Follower宕機或復制落后太多,Leader將從同步的Follower List中將其移除。通過replica.lag.time.max.ms配置來定義“落后太多”。

在分布式系統的術語中,我們只嘗試處理“失敗/恢復”模型——節點突然停止工作之后恢復的情況。Kafka不處理“拜占庭”問題。

一條消息在被應用到所有的備份上之后被認為是“已經提交的”。只有提交了的消息會被Consumer消費。這意味着Consumer不需要擔心Leader節點宕機后消息會丟失。另一方面,Producer可以配置是否等待消息被提交,這取決於他們在延遲和可用性上的權衡。這個可以通過Producer的配置項中設置。

Kafka提供了一條消息被提交之后,只要還有一個備份可用,消息就不會丟失的保證。

Kafka保證在節點故障后依舊可用,但是無法再網絡分區的情況下保持可用。

Replicated Logs: Quorums, ISRs, and State Machines (Oh my!)

Kafka分區機制的核心是日志復制。日志復制是分布式系統中最基礎的東西,有很多方式可以實現。日志復制可以作為基於狀態機的分布式系統的基礎設置。

日志復制模型用於處理連續、有序的輸入(例如給log entry添加0、1、2這樣的編號)。有很多方式實現日志復制,最簡單的方式是Leader選擇和提供這個順序之。只要Leader節點存活,Follower只需要按照Leader選擇的值和順序來復制即可。

當然,如果Leader不會宕機,那我們也不需要Follower了!在Leader宕機之后,我們需要在Follower中選擇一個節點成為新的Leader。Follower可能會宕機或者日志落后較多,所以我們必須確保選擇一個“及時同步”(復制進度和Leader最近的節點)成為新的Leader。復制算法必須提供這樣的保證:如果Client收到一條消息已經被Commit了,如果Leader宕機,新Leader必須包含這條已經被Commit的消息。這是一個權衡:Leader在確認消息Commit之前需要等待更多的Follower來確認復制了消息來保證在Leader宕機后有更多可以成為Leader的Follower節點。

如果你選擇了所需要的ACK的數量以及選擇Leader時需要比較的日志數以確保能重合,這個叫做Quorum。

一個通用的來權衡的方式是提交日志和選擇Leader時都采用大多數投票的原則。這不是Kafka使用的方式,但是無所謂,讓我們去理解這種方式來了解實現原理。假設一共有2f+1個備份,那么f+1的副本必須在Leader提交commit之前接收到消息,這樣就可以從f+1個節點中選擇出新節點作為Leader。因為任何f+1個節點,必然有一個節點包含最全的日志。還有很多關於這個算法的細節需要處理(如何定義日志更全、在Leader節點宕機時保持日志一致性等)在這里先忽略。

大多數選票的方法有非常好的特性:延遲取決於同步最快的Server節點。這說明,如果備份數為3,那么延遲取決於兩個備份節點中較快的節點。

有很多類似的算法變體,例如ZooKeeper的Zab,Raft,Viewstamped Replication等。和Kafka最相似的學術刊物是微軟的PacificA。

大多數選票方式的取消是它不能容忍很多的故障,導致你沒有可以被選為新Leader的節點。為了容忍一個節點故障,需要3分數據備份,容忍兩個節點故障則需要5個節點。在我們的經驗中,只有足夠的冗余才能容忍單一的故障在實際系統中是不夠的,每次寫5次副本,使用5倍的存儲空間,和1/5的帶寬,在大體量的數據存儲上不是很可行。這就是為什么quorum算法多多應用在像ZK這樣存儲配置的集群中,而不是數據存儲系統中。例如HDFS的namenode的高可用建立在大多數選票的機制上,但是數據存儲缺不是。

Kafka使用一個明顯不同的方式來選擇quorum集合。代替大多數選票,Kafka動態的維護一個“同步的備份(in-sync replicas ISR)”的集合。只有這個集合中的成員能被選舉為Leader。一個寫入請求需要同步到所有的同步中的備份才能認為是提交的。ISR集合在變更時會被持久化到ZK。因此,任何ISR中的備份都可以被選舉為新的Leader。這對於Kafka這種擁有多分區並且需要保證這節點負載均衡的模型來說非常重要。使用ISR模型和f+1個副本,Kafka可以容忍f個備份不可用的情況。

對於大多數的場景,我們認為這樣的妥協是合理的。在實踐中,為了容忍f個節點故障,大多數選票原則和ISR方式都需要等待相同的備份在提交消息前進行確認(如需要容忍一個節點故障,大多數選票的選擇需要3個節點,並且提交消息需要至少一個備份的確認;ISR只需要兩個節點,需要確認的副本數一樣是一個)。相對於大多數選票的原則,ISR方式不需要等待最慢的服務器確認消息是一個優勢。盡管如此,我們進行改善,讓客戶端決定是否等待消息提交,使用較小的副本數,這樣帶來的吞吐和更小的磁盤空間要求是有價值的。

另一個重要的設計是Kafka不需要故障的節點恢復所有的數據。這是不常見的,復制算法依賴於存儲介質在任何故障的情況下都不丟失數據並且不違反一致性原則。這個假設有兩個主要的問題。第一,磁盤故障是持久化數據系統中最常見的問題,並且它通常導致數據不完整。第二,即使這不是一個問題,我們也不希望在每一次寫入之后都使用fsync來保證一致性,這會使性能下降兩三個數量級。我們的協議中允許一個副本重新加入到ISR集合中,在重新加入之前,它需要從新同步在故障時丟失的數據。

Unclean leader election: What if they all die?

Kafka保證的數據不丟失,在至少有一個備份保持同步的情況下。如果一個分區所有的備份的節點都故障,那么就不能提供這個保障了。

但是實踐系統中需要一些合理的事情,在所有備份故障時。如果不巧遇上這個問題,去考慮哪些情況會發生是非常重要的。有兩種方式去做:

  1. 等待一個ISR中的副本恢復並將其選舉為新的Leader(期望它擁有所有的數據)。
  2. 選擇第一個副本(無需在ISR中)作為Leader。

這是在可用性和一致性之間的權衡。如果我們等待ISR中的備份恢復,那么會在這個期間一直不可用。如果這樣的副本被損壞,那么我們將永久性的失效。另一方便,如果使用不在ISR中的備份成為Leader,盡管它可能不包含所有的日志。默認情況下,Kafka使用第二種策略,當所有ISR中的備份不可用時,傾向於選擇可能不一致的備份。這個方式可以通過unclean.leader.election.enable配置禁用,在哪些停機時間優於不一致的場景。

這種困境不是kafka特有的, 這存在於任何基於quorum方式的結構中. 例如, 多數投票算法, 如果大多數的服務器都永久性失效了, 你必須選擇丟失全部的數據或者接受某一台可能數據不一致的服務器上的數據.

Availability and Durability Guarantees

在向Kafka寫入數據時,Producer可以選擇是否等待0,1或(-1)個備份響應。注意,這里說的“被所有備份響應”不是說被所有分配的備份響應,默認情況下只的時所有ISR集合中的備份響應。例如,如果一個Topic配置成只需要兩個備份,並且一個備份故障了,那么寫入一個備份即認為收到了所有響應。但是,如果這個備份也故障了,那么數據會丟失。這樣保證了分區的最大可用,但是可能不是那些相對於可用性更需要可靠性的用戶的需求。因此,我們提供兩種Topic級別的配置,相對於可用性,優先保證可靠性:

  1. 禁用unclean leader election;如果所有備份不可用,那么分區保持不可用,直到最近的Leader重新恢復可用。這可能導致不可用,但是不會丟失數據。

  2. 配置一個最小的ISR大小;分區只會在滿足最小ISR的情況下接受請求,這樣可以避免數據只寫入一個備份,而這個備份后續故障導致數據丟失。這個配置只在Producer使用acks=all的配置時有效。這個配置在一致性和可用性上做了權衡。更大的ISR提供了更好的一致性,但是降低了可用性,如果同步備份數小於最小ISR配置時將不可用。

Replica Management

以上的討論都是基於一個日志,即一個Topic的分區考慮的。但是Kafka集群擁有成百上千這樣的分區。我們嘗試使用輪訓的方式來平衡分區,避免高數量的Topic的分區集中在一部分少量的節點上。同樣我們要平衡所有Leader分區,這樣每個節點上承載的主分區都有一定的比例。

優化Leader的選舉過程也是非常重要的,因為這是系統不可用的窗口期。一個直觀的實現是,如果一個節點故障了,為這個節點上所有的分區都獨立的執行一次選舉。代替這種方式,我們選擇一個Broker作為Controller,Controller負責一個故障節點影響的所有分區的Leader變更。這樣的好處是我們可以批量處理,減少獨立選舉時大量的通知,這使得大量分區需要選舉時變得更快,代價更小。如果Controller故障了,剩余的Broker中會有一個節點成為新的Controller。

8 Log Compaction

日志壓縮確保Kafka會為一個Topic分區數據日志中保留至少message key的最后一個值。它解決了應用crash或系統故障或應用在操作期間重啟來重新加載緩存的場景。讓我們深入到細節中解釋日志壓縮是如何工作的。

到屋面位置,我們只說明了在一斷時間或達到特定大小的時候丟棄就日志的簡單方法。這適用於想日志這樣每一條數據都是獨立數據的情況。但是重要類別的數據是根據key處理的數據(例如DB中表的變更數據)。

讓我們來討論這樣一個具體的流的例子。一個Topic包含了用戶email address信息;每一次用戶變更郵箱地址,我們都像這個topic發送一條消息,使用用戶ID作為primay key。現在我們已經為用戶ID為123的用戶發送了一些消息,每條消息包含了email address的變更:

123 => bill@microsoft.com

123 => bill@gatesfoundation.org

123 => bill@gmail.com

日志壓縮為我們提供了更精細的保留機制,至少保存每個key最后一個變更(如123 => bill@gmail.com)。這樣做我們確保了這個日志包含了所有key最后一個值的快照。這樣Consumer可以重建狀態而不需要保留完成的變更日志。

讓我們列一些日志壓縮有用的場景,然后看他是如果被使用的。

  1. DB變更訂閱。這是很常見的,一個數據在多個數據系統中,而且其中一個系統是數據庫類型的(如RDBMS或KV系統)。例如可能有一個數據庫,一個戶緩存系統,一個搜索集群,一個Hadoop集群。DB的任何一個變更需要反映到緩存、搜索集群,最終保存到Hadoop中。在這個場景中,你只需要實時系統最新的更新日志。但是如果需要重新加載緩存或恢復宕機的檢索節點,就需要完整的數據。

  2. 事件源。這是一種應用設計風格,它將查詢處理和應用程序設計結合到一起,並使用日志作為程序的主要存儲。

  3. 高可用日志。一個本地集成程序可以通過變更日志來做到容錯,這樣另一個程序能夠在當前程序故障時繼續處理。例如, 像流數據查詢例子, 如計數, 匯總或其他的分組操作. 實時系統框架如Samza, 就是為了達到這個目的使用這個特性的。

在這些場景中,主要處理實時的變更,但有時需要重新加載或重新處理時,需要加載所有數據。日志壓縮允許使用相同的Topic來支持這些場景,這種日志使用風格在后續的內容中會更詳細的描述。

想法很簡單,我們有有無限的日志,以上每種情況記錄變更日志,我們從一開始就捕獲每一次變更。使用這個完整的日志,我們可以通過回放日志來恢復到任何一個時間點的狀態。這種假設的情況下,完整的日志是不實際的,對於那些每一行記錄會變更多次的系統,即使數據集很小,日志也會無限的增長下去。丟棄舊日志的簡單操作可以限制空間的增長,但是無法重建狀態——因為舊的日志被丟棄,可能一部分記錄的狀態會無法重建(這寫記錄所有的狀態變更都在就日志中)。

日志壓縮機制是更細粒度的,每個記錄都保留的機制,而不是基於時間的粗粒度。這個想法是選擇性的刪除哪些有更新的變更的記錄的日志。這樣最終日志至少包含每個key的記錄的最后一個狀態。

這個策略可以為每個Topic設置,這樣一個集群中,可以一部分Topic通過時間和大小保留日志,另外一些可以通過壓縮保留。

這個功能的靈感來自於LinkedIn的最古老且最成功的基礎設置——一個稱為Databus的數據庫變更日志緩存系統。不想大多數的日志存儲系統,Kafka為了訂閱而量身打造,用於線性的快速讀寫。和Databus不同,Kafka作為真實的存儲,壓縮日志是非常有用的,在上有數據源不能重放的情況下。

Log Compaction Basics

這里是一個展示Kafka日志的邏輯結構的圖(每條消息包含了一個offset):

log_cleaner_anatomy

Log head中包含傳統的Kafka日志。它包含了連續的連續的offset和所有的消息。日志壓縮增加了處理tail Log的選項。上圖展示了日志壓縮的的Log tail的情況。tail中的消息保存了初次寫入時的offset。即使該offset的消息被壓縮,所有offset仍然在日志中是有效的。在這個場景中,無法區分和下一個出現的更高offset的位置。如上面的例子中,36、37、38是屬於相同位置的,從他們開始讀取日志都將從38開始。

壓縮允許刪除。一條消息伴隨着空的值被認為從日志中刪除。這個刪除標記將會引起所有之前擁有相同key的消息被移除(包括擁有key相同的新消息),但是刪除標記比較特殊,它將在一定周期后被從日志中刪除來示范空間。這個時間點被稱為“delete retention point”。

壓縮操作通過在后台周期性的拷貝日志段來完成。清除操作不會阻塞讀取,並且可以被配置不超過一定IO吞吐來避免影響Producer和Consumer。實際的日志段壓縮過程有點像如下:

log_compaction

What guarantees does log compaction provide?

日志壓縮提供了如下的保證:

  1. 所有跟上消費的Consumer能消費到所有寫入的消息;這些消息有連續的序列號。Topic的min.compaction.lag.ms可以用於保證消息寫入多久后才會被壓縮。這限制了一條消息在Log Head中的最短存在時間。

  2. 消息的順序會被保留。壓縮不會重排序消息,只是移除其中一部分。

  3. 消息的Offset不會變更。這是消息在日志中的永久標志。

  4. 任何從頭開始處理日志的Consumer至少會拿到每個key的最終狀態。另外,只要Consumer在小於Topic的delete.retention.ms設置(默認24小時)的時間段內到達Log head,將會看到所有刪除記錄的所有刪除標記。換句話說,因為移除刪除標記和讀取是同事發生的,Consumer可能會因為落后超過delete.retention.ms而導致錯過刪除標記。

Log Compaction Details

日志壓縮由Log Cleaner執行,后台線程池重新拷貝日志段,移除那些key存在於Log Head中的記錄。每個壓縮線程如下工作:

  1. 選擇Log Head相對於Log Head在日志中占更高比例的日志
  2. 創建Log Head中每個Key最后一個offset的摘要
  3. 從頭到尾的拷貝日志,並刪除之后日志終於到相同key的記錄。新的、干凈的日志將會立即被交到到日志中,所以只需要一個額外的日志段空間
  4. Log Head的摘要實際上是一個空間緊湊的哈希表。每個條目使用24個字節。所以如果有8G的整理緩沖區, 則能迭代處理大約366G的日志頭部(假設消息大小為1k)。

Configuring The Log Cleaner

Log Cleaner默認啟用。這會啟動清理的線程池。如果要開始特定Topic的清理功能,可以開啟特定的屬性:

log.cleanup.policy=compact

這個可以通過創建Topic時配置或者之后使用Topic命令實現。

Log Cleaner可以配置保留最小的不壓縮的日志頭。可以通過配置壓縮的延遲時間:

log.cleaner.min.compaction.lag.ms

這可以用於保證消息比在被壓縮的消息大一段時間。如果沒有設置,除了最后一個日志外,所有的日志都會被壓縮。當前寫入的自如端不會被壓縮,即使所有的消息都落后於比配置的最小壓縮時間。

更多的配置在這里

9 Quotas

從0.9版本開始,Kafka可以對生產和消費請求進行限額配置。基於字節速率來限制,每個group中所有的客戶端共享一個限額。

Why are quotas necessary?

Producer和Consumer可能生產或消費大量的數據而耗盡Broker的資源,導致網絡飽和。進行限額可以避免這些問題,特別是在多租戶的集群中,一小部分低質量的客戶端會降低整個集群的體驗。實際上,當運行Kafka作為服務時,這還可以對API的使用進行限制。

Client groups

Kafka客戶端的身份代表了用於鑒權。 在無鑒權機制的集群中, 用戶身份是由服務器使用可配置的PrincipalBuilder進行選擇的, Client-id作為客戶端邏輯分組, 是由客戶端應用選擇的一個有意義的名稱. 標量(user, client-id)定義共享這個用戶身份和客戶端ID的邏輯客戶端分組.

配額可以用於(user, client-id)組合, 或user, client-id分組。

對一個給定的連接, 最符合這個連接的配額被使用到, 一個限額組的所有連接共享這個限額配置, 例如: 如果(user=”test-user”, client-id=”test-client”) 10MB/s的配額, 這個配置會被所有的具有”test-user”用戶和客戶端ID是 “test-client”的所有生產者所共享.

Quota Configuration

配額可以按照(user, client-id)或者, user或client-id進行分組, 如果需要更高或更低的配額, 可以覆蓋默配額, 這個機制類似於對日志主題配置的覆蓋, user 或者 (user, client-id)配額可以覆蓋寫入到zookeeper下的 /config/users ,client-id配置, 可以寫入到 /config/clients。這些覆蓋寫入會被服務器很快的讀取到, 這讓我們修改配置不需要重新啟動服務器. 每個分組的默認配置也可以同樣的方式動態修改。

限額的配置順序如下:

  1. /config/users//clients/
  2. /config/users//clients/
  3. /config/users/
  4. /config/users//clients/
  5. /config/users//clients/
  6. /config/users/
  7. /config/clients/
  8. /config/clients/

Broker的quota.producer.default,quota.consumer.default也可以用來配置默認的client-id分組的默認值。這可屬性已經不鼓勵使用,后續將會刪除。默認client-id限額配置可以和其它默認配置一樣, 在Zookeeper直接設置。

Enforcement

默認情況下,每個唯一的客戶端group會收到一個集群配置的固定的限額。這個限額是基於每個Broker的。每個客戶端能發布或獲取在每台服務器都的最大速率, 我們按服務器定義配置, 而不是按整個集群定義,是因為如果是集群范圍的需要額外的機制來共享配額的使用情況, 這會導致配額機制的實現比較難。

Broker檢測到限額違規時時如何處理的?在我們的解決方案中,Broker不會返回錯誤給客戶端,而是降低客戶端的速率。Broker計算使客戶端回到合理限額的需要的響應延遲。這種方法的處理對客戶端是透明,使他們不必執行任何棘手的,特殊的操作。實際上,錯誤的客戶端還可能加劇正在解決的限額問題。

客戶端字節率在多個小窗口(例如每個1秒的30個窗口)上進行測量,以便快速檢測和糾正配額違規。 通常,具有大的測量窗口(例如,每個30秒的10個窗口)導致大量的流量脈沖,隨后是長時間的延遲,這在用戶體驗方面不是很好。

 


------------------------------------------------------------------------------

 

下面是博主的公眾號,后續會發布和討論一系列分布式消息隊列相關的內容,歡迎關注。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM