初識Apache Kafka 核心概念
作者:尹正傑
版權聲明:原創作品,謝絕轉載!否則將追究法律責任。
一.kafka概要設計
kafka在設計初衷就是為了解決互聯網公司的超級大量級數據的實時傳輸。為了實現這個目標,kafka在設計之初就需要考慮以下四個方面: (1)吞吐量/延遲 (2)消息持久化 (3)負載均衡和故障轉移 (4)伸縮性
1>.吞吐量/延遲
一.吞吐量/延時介紹 我們先打個比方:若kafka處理一條消息需要花費2ms,那么計算得到的吞吐量不會超過500條消息每秒(1000ms/2ms=500條/s)。但是若我們采用批處理(batching)的思想,假設在發送前我們首先會等待一段時間(假設是8ms),那么此時消息發送的延遲變成了10ms(2ms+8ms),即延遲增加了4倍,但假設在這8ms中我們總共積累了1000條消息,那么整個系統的吞吐量就變成了100000 條/s。 此時你會發現吞吐量提升了200倍!看到micor-batch的威力了吧?這就是目前諸如Storm Trident 和 Spark Streaming等消息處理平台所采用的批處理思想。 二.Kafka如何做到高吞吐量,低延遲的呢? 首先,kafka的寫入操作是很快的,這主要得益於它對磁盤的使用方法的不同。雖然kafka會持久化所有數據到磁盤,但本質上每次寫入操作其實都只是把數據寫入到操作系統的頁緩存(page cache)中,然后由操作系統自行決定什么時候把頁緩存中的數據寫入磁盤上。這樣的設計由三個主要的優勢: (1)操作系統頁緩存是內存中分配的,所以消息寫入的速度非常快; (2)kafka不必直接與底層的文件系統打交道。所以煩瑣的I/O操作都交由操作系統來處理; (3)kafka寫入操作采用追加寫入(append)方式,避免了磁盤隨機寫操作(據資料統計,順序磁盤I/O速度是毫不遜色於隨機讀寫內存I/O速度。感興趣的小伙伴可以使用相關工具測試一下。); 三.Kafka的高吞吐量,低延遲的設計目標 (1)大量使用操作系統頁緩存,內存操作速度快且命中率高; (2)Kafka不直接參與物理I/O操作,而是交由最擅長此時的操作系統來完成; (3)采用追加寫入方式,摒棄了緩慢的磁盤隨機讀/寫操作; (4)使用sendfile為代表的零拷貝技術加強網絡間的數據傳輸效率;
2>.消息持久化的優點
(1)解耦消息發送和消息消費 本質上來說,kakfa最核心的功能就是提供了生產者-消費者模式的完整解決方案。通過將消息持久化使得生產者方不再需要直接和消費者方耦合,它只是簡單的把消息生產出來並交由kafka服務器保存即可,因此提升了整體的吞吐量。 (2)實現靈活的消息處理 很多kafka的下游子系統(接受kafka消息的系統)都有這樣的需求:對於已經處理過的消息可能在未來的某個時間點重新處理一次,即所謂的消息消息重演(message replay)。消息持久化便可以很方便地實現這樣的需求。
3>.負載均衡和故障轉移
作為一個功能完備的分布式系統,kafka如果只提供了最基本的消息引擎功能肯定不足以幫助它脫穎而出。一套完整的消息引擎解決方案中必然要提供負載均衡(load balancing)和故障轉移(fail-over)功能。
何為負載均衡?顧名思義就是讓系統的負載根據一定的規則均衡地分配在所有參數工作的服務器上,從而最大限度的提升整體的運行效率。kafka實現負載均衡實際上是通過智能化的分區領導者選舉(partition leader election)來實現的。
除了負載均衡,完備的分布式系統還支持故障轉移,所謂故障轉移,是指當服務器意外終止時,整個集群可以快速的檢測到該失效(failure),並立即將該服務器上應用或服務自動轉移到其他服務器上。故障轉移通常是“心跳”和“會話“的機制來實現的。kafka服務器支持故障轉移的方式就是使用會話機制。每台kafka服務器啟動后會以會話的形式把自己注冊到zookeeper服務器上。一旦該服務運轉出現問題,與zookeeper的會話變不能維持從而超時失效,此時kafka集群會選舉出另外一台服務器來完全代替這台服務器繼續提供服務。
4>.伸縮性
所謂伸縮性,英文名是scalability。伸縮性表示想分布式系統中增加額外的計算資源(比如CPU,內存,存儲或帶寬)時吞吐量提升的能力。阻礙線性擴容的一個很常見的因素就是狀態的保存。我們知道,不論是哪類分布式系統,集群的每台服務器一定會維護很多內部狀態。如果由服務器自己來保存這些狀態信息,則必須處理一致性的問題。相反,如果服務器是無狀態的,狀態的保存和管理交與專門的協調服務來做(比如zookeeper)。那么整個集群的服務武器之間就無需繁重的狀態共享,這極大的降低了維護復雜度。倘若要擴容集群節點,只需要簡單的啟動新的節點集群和進行自動負載均衡就可以了。 Kafka正式采用了這樣的思想:每台kafka服務器上的狀態統一交友zookeeper保管。擴展kafka集群也只需要一步:啟動新的kafka服務器即可。當然這里需要言明的是,在kafka服務器上並不是所有的狀態信息都不保存,它只保存了很輕量級的內部狀態(比如從kakka 0.10.x版本之后,它將每個topic的消費者的偏移量自己維護了,把這些偏移量存放到了一個叫做“__consumer_offsets”的的topic進行維護)。
二.Kafka簡介
1>.什么是JMS
在Java中有一個角消息系統的東西,我們叫他Java Message Service,簡稱JMS。比如各種MQ。我們舉個簡單的例子,在java中進程之間通信需要socket,“路人甲”要向“路人乙”發送數據,需要“路人乙”開啟服務,暴露端口。這樣面臨的問題就是如果“路人乙”不在線,“路人甲”就不能發送數據給“路人乙”。
為了解決該問題就需要在“路人甲”和“路人乙”之間引入消息中間件,進行解耦。如下圖所示:
2>.JMS的兩種工作模式
第一種模式:點到點(point to point,簡稱P2P),典型的一對一模式(一個人發送數據的同時只有一個人接收數據),也有人稱之為端到端(peer to peer)或者隊列模式(queue)。
第二種模式:發布訂閱模式(publish subscribe,簡稱P-S),典型的一對多模式(一個人發送數據的同時可以有多個人接收數據),也有人稱為主題模式(在生產者和消費者之間加入了topic(主題),主題相當於公告欄,生產者發送消息到主題后,所有消費者都可以看到,功能類似於咱們平時接觸的微信公眾號)。
3>.Kafka的工作模式
Kafka的工作模式可以把JMS的兩種模式結合在一起,我們稱之為消費者組模式。
4>.什么是Kafka
Kafka最初是由LinkedIn公司開發,並於 2011年初開源。在流式計算中,Kafka一般用來緩存數據,Storm,Spark,Fink等通過消費Kafka的數據進行計算。Apache Kafka是一個開源消息系統,由Scala寫成。是由Apache軟件基金會開發的一個開源消息系統項目。
作為一個流系統,Apache Kafka有三種關鍵能力: (1)發布和訂閱記錄流。在這個方面其等於一個消息隊列或企業級消息中間件,也是我們最熟悉的用法 (2)以容錯的方式存儲記錄流。這是Kafka和很多僅基於內存的消息隊列的重要區別 (3)對流中的記錄進行處理。這是Kafka 0.10后新增的能力(Kafka Streams) 為進一步學習其機制,你需要了解: (1)Kafka以集群形式運行在一到多台服務器上,每個Kafka工作進程稱為broker; (2)Kafka集群對記錄的流進行分類存儲,這種分類稱為主題(topic); (3)每條記錄由鍵(key:消息鍵,對消息做partition時使用,即決定消息被保存在某topic下的哪個partition)、值(value:消息體,保存實際的消息數據)、時間戳(timestamp:消息發送時間戳, 用於流式處理及其他依賴時間的處理語義。如果不指定,則取當前時間)等,感興趣的可以深入了解一下Kafka的消息格式,網上有很多相關資料,我這里就不當搬運工了.
5>.Kafka版本
Kafka是一種高吞吐,分布式,基於發布訂閱的流系統。最初由LinkedIn公司開發,后成為Apache頂級項目。目前(20190710)社區版本最新版本為2.2.x,從0.10.0版本開始核心設計沒有大的變化,API也基本穩定。 如下圖所示,我們可以對比 Cloudera Distribution of Apache Kafka(簡稱CDK)和 Apache Kafka的對應關系,原鏈接為:https://www.cloudera.com/documentation/kafka/latest/topics/kafka_packaging.html#concept_fzg_phl_br。
6>.Kafka核心API
如下圖所示,Kafka有四類核心API:
(1)Producer API
允許應用程序將記錄流發布到一個或多個topic中.
(2)Consumer API
允許應用程序從一個或多個topic中訂閱記錄流.
(3)Streams API
允許應用程序作為流處理器,從一個或多個topic中消費輸入流,並向一個或多個topic寫出輸出流,在輸入流和輸出流中做轉換.
(4)Connector API
允許創建和運行可重用的生產者或消費者,其將topic連接到現有應用程序或數據系統,例如一個到關系型數據庫的connector可將表的變動捕捉到topic中.
7>.kafka特點
第一:可以處理大量數據,TB級別; 第二:高吞吐量,支持每秒種百萬消息,傳輸速度可達到300MB/s; 第三:分布式,支持在多個Server之間進行消息分區; 第四:多客戶端支持,和多種語言進行協同; 第五:它是一個集群,擴容起來也相當方便;
三.kafka消息隊列
1>.kafka消息隊列內部實現原理
點對點模式(一對一,消費者主動拉取數據,消息收到后消息清除,pull) 點對點模型通常是一個基於拉取或者輪詢的消息傳送模型,這種模型從隊列中請求信息,而不是將消息推送到客戶端。這個模型的特點是發送到隊列的消息被一個且只有一個接收者接收處理,即使有多個消息監聽者也是如此。 發布/訂閱模式(一對多,數據生產后,推送給所有訂閱者,push) 發布訂閱模型則是一個基於推送的消息傳送模型。發布訂閱模型可以有多種不同的訂閱者,臨時訂閱者只在主動監聽主題時才接收消息,而持久訂閱者則監聽主題的所有消息,即使當前訂閱者不可用,處於離線狀態。
2>.為什么需要消息隊列
(1)解耦: 允許你獨立的擴展或修改兩邊的處理過程,只要確保它們遵守同樣的接口約束。 (2)冗余: 消息隊列把數據進行持久化直到它們已經被完全處理,通過這一方式規避了數據丟失風險。許多消息隊列所采用的"插入-獲取-刪除"范式中,在把一個消息從隊列中刪除之前,需要你的處理系統明確的指出該消息已經被處理完畢,從而確保你的數據被安全的保存直到你使用完畢。 (3)擴展性: 因為消息隊列解耦了你的處理過程,所以增大消息入隊和處理的頻率是很容易的,只要另外增加處理過程即可。 (4)靈活性 & 峰值處理能力: 在訪問量劇增的情況下,應用仍然需要繼續發揮作用,但是這樣的突發流量並不常見。如果為以能處理這類峰值訪問為標准來投入資源隨時待命無疑是巨大的浪費。使用消息隊列能夠使關鍵組件頂住突發的訪問壓力,而不會因為突發的超負荷的請求而完全崩潰。 (5)可恢復性: 系統的一部分組件失效時,不會影響到整個系統。消息隊列降低了進程間的耦合度,所以即使一個處理消息的進程掛掉,加入隊列中的消息仍然可以在系統恢復后被處理。 (6)順序保證: 在大多使用場景下,數據處理的順序都很重要。大部分消息隊列本來就是排序的,並且能保證數據會按照特定的順序來處理。(Kafka保證一個Partition內的消息的有序性) (7)緩沖: 有助於控制和優化數據流經過系統的速度,解決生產消息和消費消息的處理速度不一致的情況。 (8)異步通信: 很多時候,用戶不想也不需要立即處理消息。消息隊列提供了異步處理機制,允許用戶把一個消息放入隊列,但並不立即處理它。想向隊列中放入多少消息就放多少,然后在需要的時候再去處理它們。
3>. Kafka架構圖簡介
Kafka關鍵名詞介紹: 一.Producer : 消息生產者,就是向kafka broker發消息的客戶端。 二.Consumer : 消息消費者,向kafka broker取消息的客戶端 三.Topic : 可以理解為一個隊列,它是Kafka管理消息的實例。 四.Consumer Group (我們這里簡稱CG,即消費者組): 這是kafka用來實現一個topic消息的廣播(發給所有的consumer)和單播(發給任意一個consumer)的手段。一個topic可以有多個CG。topic的消息會復制-給consumer。如果需要實現廣播,只要每個consumer有一個獨立的CG就可以了。要實現單播只要所有的consumer在同一個CG。用CG還可以將consumer進行自由的分組而不需要多次發送消息到不同的topic。 關於Consumer Group我們要注意以下幾點: 1>.在同一個CG的中,同時只能有一個consumer對topic進行消費; 2>.在同一個CG中,所有的consumer是不會重復消費數據的,也就是說,同一個topic中的某個Partition中的數據被當前CG的一個consumer消費后,是不會再被這個GC中的其它consumer再次進行消費啦; 3>.在同一個CG中,每一個consumer消費單元是都以Partition為消費單元的,換句話說,在同一個CG中,只要consumer和topic中的Partition建立RPC連接后,那么這個Partition中的所有數據只會被這個consumer消費。 五.Broker : 一台kafka服務器就是一個broker。一個集群由多個broker組成。一個broker可以容納多個topic。 六.Partition: Kafka集群中,Partition是生產者和消費者操作的最小單元。為了實現擴展性,一個非常大的topic可以分布到多個broker(即服務器)上,一個topic可以分為多個partition,每個partition是一個有序的隊列。partition中的每條消息都會被分配一個有序的id(offset)。kafka只保證按一個partition中的順序將消息發給consumer,不保證一個topic的整體(多個partition間)的順序。 七.Offset: kafka的存儲文件都是按照offset.kafka來命名,用offset做名字的好處是方便查找。例如你想找位於2049的位置,只要找到2048.kafka的文件即可。當然the first offset就是00000000000.kafka 八.Zookeeper集群 Zookeeper保存的東西有兩個: 1>.Kafka集群節點的狀態信息(便於管理leader和follower角色); 2>.消費者當前正在消費消息的狀態信息(比如保存消費者消費的偏移量(Offset))。 溫馨提示:它並沒有保存生產者的一些元數據信息。 九.Replica Kafka存儲數據的分區分為主從,即Leader和Follower。也就是說,kafka自己就有冗余機制,它將數據寫入Leader的Partition之后,會將這份數據拷貝到其它broker的follower的Partition之中。溫馨提示:這個存儲在follower的Partition數據不會直接和消息生成者溝通,更不會跟消息消費者進行溝通,它僅僅是起到一個數據備份的作用!當Kafka的leader節點掛掉時,follower的各個節點會重寫選舉出新的leader,並由新的leader向外提供服務。 十.Event Kafka集群保存消息是以Partition去保存的,每一個Partition是按照隊列去保存的,消息是以Event來包裝消息的,一個消息就是一個event。因此每個Partition的消息是有序的,多個Partition之間的消息是無序的!
四.Kafka核心概念
1>.topic和partition
在概念上來說,topic只是一個邏輯概念,代表了一類消息,也可以認為是消息被發送到的地方。通常我們可以使用topic來區分實際業務,比如業務A使用一個topic,業務B使用另一個topic。從本質上說,每個Kafka topic都由若干個partition組成,而Kafka的partition是不可修改的有序消息序列,也就是說是有序的消息日志。每個partition有自己專屬的partition號,通常是從0開始的。用戶堆partition我唯一能做的操作就是在消息序列的尾部追加寫入消息。
partition上的每條消息都會被分配一個唯一的序列號,按照Kafka的術語來講,該序列號被稱為位移(offset)。該位移值是從0開始順序遞增的證書。位移信息可以唯一定義到某partition下的一條消息。值得一提的是,Kafka的partition實際上並沒有太多的業務含義,它的引入就是單純的為了提升系統的吞吐量,因此在創建Kafka topic的時候可以根據集群實際配置設置具體的partition數,實現整體性能的最大化。
2>.offset
上面說過,topic partition下的每條消息都被分配了一個位移值。實際上,Kafka消費者端也有位移(offset)的概念,但一定要注意這兩個offset屬於不同的概念。
顯然,每條消息在某個partition的位移是固定的,但消費該partition的消費者的位移是會隨着消費進度不斷遷移,但終究不可能超過該分區最新一條消息的位移。綜合之前說的topic,partition和offset,我們可以斷言Kafka中的一條消息其實就是一個<topic,partition,offset>三元組(tuple),通過該元組值我們可以在Kafka集群中找到位移對應的那條消息。
3>.Replica
既然我們已知partition是有序的消息日志,那么一定不能只保存者一份日志,否則一旦保存在partition的Kafka服務器掛掉了,其上保存的消息也就都丟失了。分布式系統必然要實現高可靠性,而目前實現的主要途徑還是依靠冗余機制。換句話說,就是備份多份日志。這些分貝日志在Kafka中被稱為副本(replica),它們存在的唯一目的就是防止數據丟失,這一點一定要記住!
4>.leader和follower
副本(replia)分為兩類:領導者副本(leader replia)和追隨者副本(follower replia)。follower replica是不能提供服務給客戶端的,也就是說不負責響應客戶端發來的消息寫入和消息消費請求。它只是被動地向領導者副本(leader replia)獲取數據,而一旦leader replica 所在的broker宕機,Kafka會從剩余的replica中選舉出新的leader繼續提供服務。
Kafka保證同一個partition的多個replica一定不會分配在同一台broker上。畢竟如果同一個broker上有同一個partition的多個replica,那么將無法實現備份冗余的效果。
5>.producer
生產者將數據發布到它們指定的topics。生產者負責選擇將記錄分配到topic中的哪個分區。可以以round-robin方式分配以簡單地負載均衡,或可以按可以按某個分區函數(基於記錄的鍵來計算)來分配。
6>.consumer
消費者擁有一個消費者組(consumer group)名,topic的每條記錄會被 傳輸給其消費者組中的唯一一個消費者。消費者實例可以是單獨的線程,也可以位於單獨的機器上。
如果所有的消費者實例有相同的消費者組名,記錄會在這些消費者實例間 有效地負載均衡。如果所有的消費者實例都屬於不同的消費者組,則每條 記錄會被廣播到所有的消費者線程上。
Kafka中消費的實現,是將log中的分區划分到消費者實例上,使得任何時 刻每個實例都是分區“公平划分”后的排他的消費者。這個管理組內成員 的過程是由Kafka協議動態處理的。如果新實例加入消費者組,它們會從 其他組內成員接管部分分區;如果一個實例掛掉,其分區會被分配給其他 實例。 Kafka只提供分區內記錄的全局有序,而不是某topic內不同分區間的全局 有序。分區有序加上按鍵分區數據的能力對大多數應用來說足夠。然而, 如果一定需要記錄的全局有序,可以通過指定topic只有一個分區實現, 但這意味着每個消費者組只能有一個消費線程。
下圖為2節點的Kafka集群維持4個分區(P0-P3),有2個消費者組。消費 者組A有2個消費者,消費者組B有4個消費者。
7>.broker
Kafka是一個分布式消息隊列。Kafka對消息保存是根據Topic進行歸類,發送消息者稱為Producer,消息接受者稱為Consumer,此外kafka集群有多個kafka實例組成,每個實例(server)成為broker。
無論是kafka集群,還是producer和consumer都依賴於zookeeper集群保存一些meta信息,來保證系統可用性。因此Zookeeper在生產環境中最少建議部署3台,如果集群較大(100個節點以上)推薦配置5台。
8>.ISR
ISR的全稱是in-sync replica,翻譯過來就是與leader replica保持同步的replica集合。這是一個特別重要的概念。前面講了很多關於Kafka的副本機制,比如一個partition可以配置N個replica,那么這是否就意味着對副本因子為N的topic,可容忍最多N-1個服務器失效,不會丟失任何已提交到broker的記錄呢?答案是:“否”!(網上有資料說是可以保證數據不丟失,如果在min.insync.replicas使用默認值為1且unclean.leader.election.enable的值為true時,可能會導致數據丟失,而Producer的acks設置的為1時,也就是當leader的parition寫入成功就認為數據是寫入成功的,我們知道follower是有可能和leader節點不一致的!因此當leader節點掛掉的話,此時的數據就丟失啦!) 副本數對Kafka的吞吐率是有一定的影響,但極大的增強了可用性。默認情況下Kafka的replica數量為1,即每個partition都有一個唯一的leader,為了確保消息的可靠性,通常應用中將其值(由broker的參數offsets.topic.replication.factor指定)大小設置為大於1,比如3。 所有的副本(replicas)統稱為Assigned Replicas,即AR。ISR是AR中的一個子集,由leader維護ISR列表,follower從leader同步數據有一些延遲(包括延遲時間replica.lag.time.max.ms和延遲條數replica.lag.max.messages兩個維度, 當前最新的版本0.10.x中只支持replica.lag.time.max.ms這個維度),任意一個超過閾值都會把follower踢出ISR, 存入OSR(Outof-Sync Replicas)列表,新加入的follower也會先存放在OSR中。AR=ISR+OSR。相反的,當這些replicas重新“追上”了leader的進度時,那么Kafka會將他們加回到ISR中。這一切都是自動維護的,不需要用戶進行人為干預,因而在保證了消息交付語義的同時,還簡化了用戶的操作成本。
9>.高水位線
High watermark(高水位線)以下簡稱HW,表示消息被commit的部分,leader(和followers,看情況)確認寫入本地log並返回ack后這些數據才被認為是committed(kafka的參數min.insync.replicas可以控制多少個副本ack后才算commit,默認為1,即只需要leader寫成功該段log就committed,此時ISR可能最小只有1,如果設為n,則表示leader和n-1個followers都確認寫入才算committed),也就是可以被消費,所以在HW位置以下的消息都可以被消費。(在min.insync.replicas=1時,如果此時ISR只有1個replica,leader又掛掉,又設置了unclean.leader.election.enable=true,則會有一個非ISR內的follower成為新的leader,此時HW會縮小,因此consumer的lag值可能出現負數!這些配置在Apache Kafka 0.10都是默認值,因此很可能出現) Log end offset(日志結束位置)以下簡稱LEO,表示消息的最后位置。LEO>=HW,一般會有沒提交的部分(除了ISR中最慢的follower,事實上HW=min(ISR中各副本的LEO))。uncommitted部分的消息,需要等待如何向producer進行ack,對應的請求會停留在一塊稱為purgatory(煉獄)的區域中。具體怎么ack,取決於producer的acks設置:如果設置為0,則producer發出消息后就不等待ack,是at most once的一致性;如果設為1(默認值,注意!),則只要leader的log成功寫入請求就返回了,但如果在ack發送給producer前leader就掛了(或丟包了),producer仍然收不到ack,會對請求進行重發,此時數據就有可能重,是at least once的一致性。但在leader掛掉時,leader有但未被followers同步的數據又無法同步,會丟一部分數據;如果設置為all或-1,則需要所有ISR中的followers確認寫入后請求才返回(注意,如果允許ISR最少只有1個成員,則和acks=1又沒有實質區別了!),也是at least once的一致性,且基本不存在丟數據的可能。 因此,對於producer來說,保證數據不丟(producer送出的消息均一直能被消費到)需要滿足以下條件:min.insync.replicas至少設為2、unclean.leader.election.enable設為false、producer的acks設為all。但這種設置會對kafka的吞吐率、故障恢復時間造成巨大影響,究竟要不要保證這么強的一致性,就需要你評估了。
以上討論的都與producer相關。對於consumer一側,情況則比較明朗,根據同步還是異步消費、commit調用的時機決定了at least once還是atmost once,大家可參考文檔或圖書進行理解。
10>.reblance掃盲
一.rebalance簡介
consumer group的rebalance本質上是一組協議,它規定了一個consumer group 是如何達成一致來分配訂閱topic的所有分區的。假設某個組下有20個consumer實例,該組訂閱一個有着100個分區的topic。正常情況下,Kafka會為每個consumer平均分配5個分區。這個分配過程就被稱為rebalance。
當consumer成功執行rebalance后,組訂閱topic的每個分區只會分配給組內一個consumer實例。換句話說,同一個消費者組的消費者不能同時對同一個topic的同一個分區進行消費。
和舊版本consumer依托於zookeeper進行rebalance不同,新版本consumer使用了Kafka內置的一個全新的協議(group coordination protocol)。對於每個組而言,Kafka的某個broker會被選舉為組協調者(group coordinator)。coordinator負責對組對狀態進行管理,他的主要責任就是當新成員到達時促成組內所有成員達成新對分區分配方案,即coordinator負責對組執行rebalance操作。
二.rebalance觸發條件
組rebalance觸發對條件有以下3個:
第一:組成員發生變更,比如新consumer加入組,或已有consumer主動離開組,再或是已有consumer崩潰時則觸發rebalance;
第二:組訂閱topic數發生變更,比如使用基於正則表達式對訂閱,當匹配正則表達式對新topic被創建時則會觸發rebalance;
第三:組訂閱topic時分區發生變更,比如使用命令行腳本增加了訂閱topic的分區數;
真實應用場景引發rebalance最常見的原因就是違背了第一個條件(比如flume的kafka source相對於broker集群來說就是consumer對象),特別是consumer崩潰的情況。這里的崩潰不一定就是指consumer進程“掛掉”或consumer進程所在的機器宕機。當consumer無法在指定的時間內完成消息處理,那么coordinator就認為該consumer已經崩潰,從而引發新一輪rebalance。
我在生產環境中也使用flume消費kafka的數據到hdfs集群上,也遇到過這種rebalance的情況,最終分析原因是:該group下的consumer處理消息的邏輯過重,而且事件處理時間波動很大,非常不穩定,從而導致coordinator會經常行的認為某個consumer已經掛掉,引發rebalance。鑒於目前一次rebalance操作的開銷很大,生產環境中用戶一定要結合自身業務特點仔細調優consumer參數:“request.timeout.ms”,“max.poll.records”和“max.poll.interval.ms”這幾個參數,以避免不必要的rebalance出現。
三.rebalance協議
前面我們提到過rebalance本質上是一組協議。group於coordinator共同使用這組協議完成group的rebalance。最新版本的Kafka中提供了下面5個協議來處理rebalance相關事宜。
第一:JoinGroup請求:consumer請求加入組;
第二:SyncGroup請求:group leader 把分配方案同步更新到組內所有成員中;
第三:Heartbeat請求:consumer定期向coordinator匯報心跳表明自己依然存活;
第四:LeaveGroup請求:consumer主動通知coordinator該consumer即將離組;
第五:DescribeGroup請求:查看組的所有信息,包括成員信息,協議信息,分配方案以及訂閱信息等。該請求類型主要供管理員使用。coordinator不使用該請求執行rebalance。
在rebalance過程中,coordinator主要處理consumer發過來的joinGroup和SyncGroup請求。當consumer主動離組時會發送LeaveGroup請求給coordinator。
在成功rebalance過程中,組內所有consumer都需要定期向coordinator發送Hearbeat請求。而每個consumer也是根據Heartbeat請求的響應中是否包含REBALANCE_IN_PROGRESS來判斷當前group是否開啟來新一輪rebalance。
好啦~關於rebalance咱們了解到這里基本上就夠用來,感興趣的小伙伴可以查看rebalance genneration,rebalance流程,rebalance監聽器等技術,我們這里就不用深入探討啦~
五.Kafka的使用場景
Kafka以消息引擎聞名,因此它特別適合處理生產環境中的那些流式數據。以下就是Kafka在實際應用中一些典型的使用場景。
1>.消息傳輸
Kafka非常適合替代傳統的消息總線(message bus)或消息代理(message broker)。傳統的這類系統擅長於解耦生產者和消費者以及批量處理消息,而這些特點Kafka都具備。除此之外,Kafka還具有更好的吞吐量特性,其內置的分區機制和副本機制既實現了高性能的消息傳輸,同時還達到了高性能的高容錯性。一次Kafka特別適合用於實現一個超大量級消息處理應用。
2>.網站行為日志追蹤
Kafka最早就是用於重建用戶行為數據追蹤系統的。很多網站上的用戶操作都會以消息的形式發送到Kafka的某個對應的topic上。這些點擊流蘊含了巨大的商業價值,事實上,目前就有很多創業公司使用機器學習或其他實時處理框架來幫助收集並分析用戶的點擊流數據。鑒於這種點擊流數據量是很大的,Kafka超強的吞吐量特性此時就有了用武之地。
3>.審計數據收集
很多企業和組織都需要對關鍵的操作和運維進行監控和審計。這就需要從各個方面運維應用程序處實時匯總操作步驟信息進行集中式管理。在這種使用場景下,你會發現Kafka是非常適合的解決方案,它可以便捷的對多路消息進行實時收集,同時由於其持久化的特性,是的后續離線審計稱為可能。
4>.日志收集
這可能是Kafka最常見的使用方式了(日志收集匯總解決方案),每個企業都會產生大量的服務日志,這些日志分散在不同的機器上。我們可以使用Kafka對他們進行全量收集,並集中往下游的分布式存儲中(比如HDFS等)。比起其他主流的日志抽取框架(比如Apache Flume),Kafka有更好的性能,而且提供了完備的可靠性解決方案,同時還保持 了低延遲的特點。
5>.Event Sourcing
Event Sourcing實際上是領域驅動設計(Domain-Driven Design,簡稱DDD)的名次,它使用事件序列來表示狀態變更,這種思想和Kafka的設計特性不謀而合。還記得吧,Kafka也是用不可變更的消息序列來抽象化表示業務信息的,因此Kafka特別適合作為這種應用的后端存儲。
6>.流式處理
很多用戶接觸到Kafka都是因為它的消息存儲引擎。自0.10.0.0版本開始,Kafka社區推出了一個全新的流式組件 Kafka Streams。這標志着Kafka正式進入流式處理框架俱樂部。相比老牌流式處理框架Apache Storm,Apache Samza,或是最近風頭正勁的Spark Streaming,抑或是Apache Flink,Kafka Streams的競爭力如何?讓我們拭目以待吧!
六.集群環境規划
1>.操作系統的選型
我們知道Kafka依賴於Java環境,因此我們只要能在操作系統上安裝jdk理論上就可以部署kafka環境了。沒錯,事實上kafka的確可以運行在主流的操作系統上,比如windows,Linux,mac OS等等。但是這么多操作系統我們究竟應該選擇哪個操作系統去安裝呢?為什么大家部署kafka集群都選擇的是Linux環境呢?其實咱們是可以分析原因的: 第一:Kafka新版本clients在設計底層網絡庫時采用了Java的Selecor機制(NIO),而后者在Linux實現機制就是epoll;但是在window平台上,Java NIO的Selector底層是使用select模型而非IOCP實現的,只有Java NIO2才是使用IOCP實現的。因此這一點上,在Linux部署Kafka要在比Windows上部署能夠得到高效的I/O處理能力; 第二:對於數據網絡傳輸效率而言,Linux也更具有優勢。具體來說,Kafka這種應用必然需要大量的通過網絡於磁盤進行數據傳輸,而大部分這樣的操作都是通過Java的FileChannel.transferTo方法實現的,在Linux平台上該方法底層會調用sendfile系統調用,即采用了Linux提供的零拷貝(Zero Copy)技術。
Kafka可以在ext4或xfs上很好的工作。有一些掛載優化選項可用,例如noatime,data=writeback等,但除noatime強烈建議使用外,其余選項對效率的提升有限,可以參考官網文件進行配置。
另一個問題是要不要調整 文件刷寫行為。Kafka的寫操作立即將數據寫到文件系統層面,但文件系統是否馬上將數據刷寫的到磁盤上,則有一些選項可以控制。例如,Kafka的默認行為是應用內不調用fsync,而是由OS的后台線程進行刷寫;也可以周期性地調用fsync確保數據已經刷寫到磁盤。良好的事件是,不要改變默認的行為。在文件系統層面也不需要做額外的調整。
2>.磁盤規划
事實上,根據公開的資料顯示,LinkedIn公司的Kafka集群就是使用RAID 10作為底層存儲的。除了默認提供的數據冗余之外,RAID 10 還可以將數據自動的負載分布到多個磁盤上。由此可見,RAID作為Kafka的底層存儲其實主要的優勢有兩個: 第一:提供冗余的數據存儲空間; 第二:天然提供負載均衡;
以上兩個優勢對於任何系統而言都是很好的特性。不過對於Kafka而言,Kafka在框架層面其實已經提供了這兩個特性:通過副本機制提供冗余和高可靠性,以及通過分散到各個節點的領導者選舉機制來實現負載均衡,所以從這方面來看,RAID的優勢就顯得不是那么明顯了。事實上,LinkedIn公司目前正在計划將整個Kafka集群從RAID 10 遷移到JBOD上,只不過在整個過程中JBOD方案需要解決當前的Kafka一些固有缺陷,比如: 第一:任意磁盤損壞都會導致broker宕機,普通磁盤損壞的概率是很大的,因此這個缺陷從某種程度上來說是致命的。不過社區正在改進這個問題,未來版本中只要為broker配置的多塊磁盤中還有良好的磁盤,broker就不會掛掉。 第二:JBOD的管理需要更加細粒度化,目前Kafka沒有提供腳本或其他工具用於在不同磁盤間進行手動分配,但這是使用JBOD方案中必要的功能。 第三:JBOD也應該提供類似於負載均衡的功能,目前只是間的依賴輪訓的方式為副本數據選擇磁盤,后續需要提供更加豐富的策略。
結合JBOD和RAID之間的優劣對比以及LinkIn公司的實際案例,咱們可以給硬盤規划的結論性總結如下: 第一:追求性價比的公司可以考慮使用JBOD; 第二:使用機械硬盤完全可以滿足Kafka集群的使用,SSD更好(盡量不要使用NAS(Network Attached Storage)這樣的網絡存儲設備。);
3>. 磁盤容量規划
對於磁盤容量的規划和以下結果因素有關: 第一:新增消息數; 第二:消息留存時間; 第四:平均消息大小; 第五:副本數; 第六:是否啟用壓縮; 根據實際情況而定,比如我們線上的服務器磁盤使用率始終達不到50,盡管我默認保留了168小時,即數據保留了7天喲~就會存在資源浪費的情況,雖然說你可以為了使用這些存儲空間,可以在該服務器上搭建其他網絡存儲服務,但你無法確定搭建的這個服務是否在高峰期時會影響到kafka集群的性能! [root@kafka116 ~]# df -h Filesystem Size Used Avail Use% Mounted on /dev/mapper/centos_localhost-root 50G 27G 24G 53% / devtmpfs 16G 0 16G 0% /dev tmpfs 16G 0 16G 0% /dev/shm tmpfs 16G 1.6G 14G 11% /run tmpfs 16G 0 16G 0% /sys/fs/cgroup /dev/sda1 1014M 143M 872M 15% /boot /dev/mapper/centos_localhost-home 80T 26T 55T 32% /home tmpfs 3.2G 0 3.2G 0% /run/user/0 tmpfs 3.2G 0 3.2G 0% /run/user/1003 [root@kafka116 ~]#
4>.內存規划
Kafka對於內存對使用可稱作其設計亮點之一。雖然在前面我們強調了Kafka大量依靠和磁盤來保存消息,但其實它還會對消息進行緩存,而這個消息換粗你得地方就是內存,具體來說是操作系統對頁緩存(page cache)。Kafka雖然會持久化每條消息,但其實這個工作都是底層對文件系統來完成。Kafka僅僅將消息寫入page cache而已,之后將消息“flush”到磁盤對任務完全交由操作系統來完成。
一般情況下,broker所需的堆內存都不會超過6GB。所以對於一台16GB內存的機器而言,文件系統page cache的大小甚至可以達到10~14GB!總之對於內存規划的建議如下:
第一:盡量分配更多的內存給操作系統的page cache;
第二:不要為broker設置過大的堆內存,最好不超過6GB;
第三:page大小至少要大於一個日志段的大小;
5>.CPU規划
比起磁盤和內存,CPU於kafka而言並沒有那么重要,嚴格來說,kafka不屬於計算密集型(CPU-bound)的系統,因此對於CPU需要記住一點就可以了:追求多核而非高時鍾頻率。咱們對CPU資源規划如下: 第一:使用多核系統,CPU核數最好大於8; 第二:如果使用Kafka 0.10.0.0之前的版本或clients端消息版本不一致(若無顯式配置,這種情況多半由clients和broker版本不一致造成),則考慮多配置一些資源以防止消息解壓操作消耗過多CPU)。
6>.帶寬規划
第一:盡量使用高速網絡;
第二:根據自身網絡條件和帶寬來評估Kafka集群機器數量;
第三:避免使用跨機房網絡;
7>.關於JVM
需要使用1.8以上的JDK。推薦使用G1GC,其次可選擇ParNew+CMS的組合(但是做的相應的調整也比較多)。設置充足的堆大小。以下是一個示范例子:
-Xmx6g -Xms6g -XX:MetaspaceSize=96m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80
8>.典型線上環境配置
下面給出一份典型的線上環境配置,用戶可以參考這份配置以及結合自己的是實際情況進行二次調整: CPU 24核心; 內存 32GB; 磁盤 1TB 7200轉SAS盤2快; 帶寬:1Gb/s; ulimit -n 1000000; Socket Buffer 至少64KB,適合於跨機房網絡傳輸;
七.博主推薦閱讀Kafka相關書籍
《Kafka權威指南》
《Apache Kafka實戰》