http://rdcqii.hundsun.com/portal/article/709.html
KAFKA是分布式發布-訂閱消息系統,是一個分布式的,可划分的,冗余備份的持久性的日志服務。它主要用於處理活躍的流式數據。
現在被廣泛地應用於構建實時數據管道和流應用的場景中,具有橫向擴展,容錯,快等優點,並已經運行在眾多大中型公司的生產環境中,成功應用於大數據領域,本文分享一下我所了解的KAFKA。
1 KAFKA高吞吐率性能揭秘
KAFKA的第一個突出特定就是“快”,而且是那種變態的“快”,在普通廉價的虛擬機器上,比如一般SAS盤做的虛擬機上,據LINDEDIN統計,最新的數據是每天利用KAFKA處理的消息超過1萬億條,在峰值時每秒鍾會發布超過百萬條消息,就算是在內存和CPU都不高的情況下,Kafka的速度最高可以達到每秒十萬條數據,並且還能持久化存儲。
作為消息隊列,要承接讀跟寫兩塊的功能,首先是寫,就是消息日志寫入KAFKA,那么,KAFKA在“寫”上是怎么做到寫變態快呢?
1.1 KAFKA讓代碼飛起來之寫得快
首先,可以使用KAFKA提供的生產端API發布消息到1個或多個Topic(主題)的一個(保證數據的順序)或者多個分區(並行處理,但不一定保證數據順序)。Topic可以簡單理解成一個數據類別,是用來區分不同數據的。
KAFKA維護一個Topic中的分區log,以順序追加的方式向各個分區中寫入消息,每個分區都是不可變的消息隊列。分區中的消息都是以k-v形式存在。
▪ k表示offset,稱之為偏移量,一個64位整型的唯一標識,offset代表了Topic分區中所有消息流中該消息的起始字節位置。
▪ v就是實際的消息內容,每個分區中的每個offset都是唯一存在的,所有分區的消息都是一次寫入,在消息未過期之前都可以調整offset來實現多次讀取。
以上提到KAFKA“快”的第一個因素:消息順序寫入磁盤。
我們知道現在的磁盤大多數都還是機械結構(SSD不在討論的范圍內),如果將消息以隨機寫的方式存入磁盤,就會按柱面、磁頭、扇區的方式進行(尋址過程),緩慢的機械運動(相對內存)會消耗大量時間,導致磁盤的寫入速度只能達到內存寫入速度的幾百萬分之一,為了規避隨機寫帶來的時間消耗,KAFKA采取順序寫的方式存儲數據,如下圖所示:
新來的消息只能追加到已有消息的末尾,並且已經生產的消息不支持隨機刪除以及隨機訪問,但是消費者可以通過重置offset的方式來訪問已經消費過的數據。
即使順序讀寫,過於頻繁的大量小I/O操作一樣會造成磁盤的瓶頸,所以KAFKA在此處的處理是把這些消息集合在一起批量發送,這樣減少對磁盤IO的過度讀寫,而不是一次發送單個消息。
另一個是無效率的字節復制,尤其是在負載比較高的情況下影響是顯着的。為了避免這種情況,KAFKA采用由Producer,broker和consumer共享的標准化二進制消息格式,這樣數據塊就可以在它們之間自由傳輸,無需轉換,降低了字節復制的成本開銷。
同時,KAFKA采用了MMAP(Memory Mapped Files,內存映射文件)技術。很多現代操作系統都大量使用主存做磁盤緩存,一個現代操作系統可以將內存中的所有剩余空間用作磁盤緩存,而當內存回收的時候幾乎沒有性能損失。
由於KAFKA是基於JVM的,並且任何與Java內存使用打過交道的人都知道兩件事:
▪ 對象的內存開銷非常高,通常是實際要存儲數據大小的兩倍;
▪ 隨着數據的增加,java的垃圾收集也會越來越頻繁並且緩慢。
基於此,使用文件系統,同時依賴頁面緩存就比使用其他數據結構和維護內存緩存更有吸引力:
▪ 不使用進程內緩存,就騰出了內存空間,可以用來存放頁面緩存的空間幾乎可以翻倍。
▪ 如果KAFKA重啟,進行內緩存就會丟失,但是使用操作系統的頁面緩存依然可以繼續使用。
可能有人會問KAFKA如此頻繁利用頁面緩存,如果內存大小不夠了怎么辦?
KAFKA會將數據寫入到持久化日志中而不是刷新到磁盤。實際上它只是轉移到了內核的頁面緩存。
利用文件系統並且依靠頁緩存比維護一個內存緩存或者其他結構要好,它可以直接利用操作系統的頁緩存來實現文件到物理內存的直接映射。完成映射之后對物理內存的操作在適當時候會被同步到硬盤上。
1.2 KAFKA讓代碼飛起來之讀得快
KAFKA除了接收數據時寫得快,另外一個特點就是推送數據時發得快。
KAFKA這種消息隊列在生產端和消費端分別采取的push和pull的方式,也就是你生產端可以認為KAFKA是個無底洞,有多少數據可以使勁往里面推送,消費端則是根據自己的消費能力,需要多少數據,你自己過來KAFKA這里拉取,KAFKA能保證只要這里有數據,消費端需要多少,都盡可以自己過來拿。
▲零拷貝
具體到消息的落地保存,broker維護的消息日志本身就是文件的目錄,每個文件都是二進制保存,生產者和消費者使用相同的格式來處理。維護這個公共的格式並允許優化最重要的操作:網絡傳輸持久性日志塊。 現代的unix操作系統提供一個優化的代碼路徑,用於將數據從頁緩存傳輸到socket;在Linux中,是通過sendfile系統調用來完成的。Java提供了訪問這個系統調用的方法:FileChannel.transferTo API。
要理解senfile的影響,重要的是要了解將數據從文件傳輸到socket的公共數據路徑,如下圖所示,數據從磁盤傳輸到socket要經過以下幾個步驟:
▪ 操作系統將數據從磁盤讀入到內核空間的頁緩存
▪ 應用程序將數據從內核空間讀入到用戶空間緩存中
▪ 應用程序將數據寫回到內核空間到socket緩存中
▪ 操作系統將數據從socket緩沖區復制到網卡緩沖區,以便將數據經網絡發出
這里有四次拷貝,兩次系統調用,這是非常低效的做法。如果使用sendfile,只需要一次拷貝就行:允許操作系統將數據直接從頁緩存發送到網絡上。所以在這個優化的路徑中,只有最后一步將數據拷貝到網卡緩存中是需要的。
常規文件傳輸和zeroCopy方式的性能對比:
假設一個Topic有多個消費者的情況, 並使用上面的零拷貝優化,數據被復制到頁緩存中一次,並在每個消費上重復使用,而不是存儲在存儲器中,也不在每次讀取時復制到用戶空間。 這使得以接近網絡連接限制的速度消費消息。
這種頁緩存和sendfile組合,意味着KAFKA集群的消費者大多數都完全從緩存消費消息,而磁盤沒有任何讀取活動。
▲批量壓縮
在很多情況下,系統的瓶頸不是CPU或磁盤,而是網絡帶寬,對於需要在廣域網上的數據中心之間發送消息的數據流水線尤其如此。所以數據壓縮就很重要。可以每個消息都壓縮,但是壓縮率相對很低。所以KAFKA使用了批量壓縮,即將多個消息一起壓縮而不是單個消息壓縮。
KAFKA允許使用遞歸的消息集合,批量的消息可以通過壓縮的形式傳輸並且在日志中也可以保持壓縮格式,直到被消費者解壓縮。
KAFKA支持Gzip和Snappy壓縮協議。
2 KAFKA數據可靠性深度解讀
KAFKA的消息保存在Topic中,Topic可分為多個分區,為保證數據的安全性,每個分區又有多個Replia。
▪ 多分區的設計的特點:
1.為了並發讀寫,加快讀寫速度;
2.是利用多分區的存儲,利於數據的均衡;
3.是為了加快數據的恢復速率,一但某台機器掛了,整個集群只需要恢復一部分數據,可加快故障恢復的時間。
每個Partition分為多個Segment,每個Segment有.log和.index 兩個文件,每個log文件承載具體的數據,每條消息都有一個遞增的offset,Index文件是對log文件的索引,Consumer查找offset時使用的是二分法根據文件名去定位到哪個Segment,然后解析msg,匹配到對應的offset的msg。
2.1 Partition recovery過程
每個Partition會在磁盤記錄一個RecoveryPoint,,記錄已經flush到磁盤的最大offset。當broker 失敗重啟時,會進行loadLogs。首先會讀取該Partition的RecoveryPoint,找到包含RecoveryPoint的segment及以后的segment, 這些segment就是可能沒有完全flush到磁盤segments。然后調用segment的recover,重新讀取各個segment的msg,並重建索引。每次重啟KAFKA的broker時,都可以在輸出的日志看到重建各個索引的過程。
2.2 數據同步
Producer和Consumer都只與Leader交互,每個Follower從Leader拉取數據進行同步。
如上圖所示,ISR是所有不落后的replica集合,不落后有兩層含義:距離上次FetchRequest的時間不大於某一個值或落后的消息數不大於某一個值,Leader失敗后會從ISR中隨機選取一個Follower做Leader,該過程對用戶是透明的。
當Producer向Broker發送數據時,可以通過request.required.acks參數設置數據可靠性的級別。
此配置是表明當一次Producer請求被認為完成時的確認值。特別是,多少個其他brokers必須已經提交了數據到它們的log並且向它們的Leader確認了這些信息。
▪典型的值:
0: 表示Producer從來不等待來自broker的確認信息。這個選擇提供了最小的時延但同時風險最大(因為當server宕機時,數據將會丟失)。
1:表示獲得Leader replica已經接收了數據的確認信息。這個選擇時延較小同時確保了server確認接收成功。
-1:Producer會獲得所有同步replicas都收到數據的確認。同時時延最大,然而,這種方式並沒有完全消除丟失消息的風險,因為同步replicas的數量可能是1。如果你想確保某些replicas接收到數據,那么你應該在Topic-level設置中選項min.insync.replicas設置一下。
僅設置 acks= -1 也不能保證數據不丟失,當ISR列表中只有Leader時,同樣有可能造成數據丟失。要保證數據不丟除了設置acks=-1,還要保證ISR的大小大於等於2。
▪具體參數設置:
request.required.acks:設置為-1 等待所有ISR列表中的Replica接收到消息后采算寫成功。
min.insync.replicas: 設置為>=2,保證ISR中至少兩個Replica。
Producer:要在吞吐率和數據可靠性之間做一個權衡。
KAFKA作為現代消息中間件中的佼佼者,以其速度和高可靠性贏得了廣大市場和用戶青睞,其中的很多設計理念都是非常值得我們學習的,本文所介紹的也只是冰山一角,希望能夠對大家了解KAFKA有一定的作用。