一、Kafka消費者源碼介紹
1.分區消費模式源碼介紹
分區消費模式直接由客戶端(任何高級語言編寫)使用Kafka提供的協議向服務器發送RPC請求獲取數據,服務器接受到客戶端的RPC請求后,將數據構造成RPC響應,返回給客戶端,客戶端解析相應的RPC響應獲取數據。
Kafka支持的協議眾多,使用比較重要的有:
獲取消息的FetchRequest和FetchResponse
獲取offset的OffsetRequest和OffsetResponse
提交offset的OffsetCommitRequest和OffsetCommitResponse
獲取Metadata的Metadata Request和Metadata Response
生產消息的ProducerRequest和ProducerResponse
2.組消費模式源碼介紹
3.兩種消費模式服務器端源碼對比
分區消費模式具有以下特點:
指定消費topic、partition和offset通過向服務器發送RPC請求進行消費;
需要自己提交offset;
需要自己處理各種錯誤,如:leader切換錯誤
需要自己處理消費者負載均衡策略
組消費模式具有以下特點:
最終也是通過向服務器發送RPC請求完成的(和分區消費模式一樣);
組消費模式由Kafka服務器端處理各種錯誤,然后將消息放入隊列再封裝為迭代器(隊列為FetchedDataChunk對象) ,客戶端只需在迭代器上迭代取出消息;
由Kafka服務器端周期性的通過scheduler提交當前消費的offset,無需客戶端負責
Kafka服務器端處理消費者負載均衡
監控工具Kafka Offset Monitor 和Kafka Manager 均是基於組消費模式;
所以,盡可能使用組消費模式,除非你需要:
自己管理offset(比如為了實現消息投遞的其他語義);
自己處理各種錯誤(根據自己業務的需求);
二、Kafka生產者源碼介紹
1.同步發送模式源碼介紹
2.異步發送模式源碼介紹
3.兩種生產模式服務器端源碼對比
同步發送模式具有以下特點:
同步的向服務器發送RPC請求進行生產;
發送錯誤可以重試;
可以向客戶端發送ack;
異步發送模式具有以下特點:
最終也是通過向服務器發送RPC請求完成的(和同步發送模式一樣);
異步發送模式先將一定量消息放入隊列中,待達到一定數量后再一起發送;
異步發送模式不支持發送ack,但是Client可以調用回調函數獲取發送結果;
所以,性能比較高的場景使用異步發送,准確性要求高的場景使用同步發送
三、Kafka Server Reactor設計模型
1.認識Java NIO
Java NIO由以下幾個核心部分組成 :
Channels;
Buffers;
Selectors

2.認識Linux epoll模型
epoll 是一種IO多路復用技術 ,在linux內核中廣泛使用。常見的三種IO多路復用技術為select模型、poll模型和epoll模型。
select 模型需要輪詢所有的套接字查看是否有事件發生 。缺點: (1)套接字最大支持1024個;(2)主動輪詢效率很低;(3) 事件發生后需要將套接字從內核空間拷貝到用戶空間,效率低
poll模型和select模型原理一樣,但是修正了select模型最大套接字限制的缺點;
epoll模型修改主動輪詢為被動通知,當有事件發生時,被動接收通知。所以epoll模型注冊套接字后,主程序可以做其他事情,當事件發生時,接收到通知后再去處理。修正了select模型的三個缺點(第三點使用共享內存修正)。
Java NIO的Selector模型底層使用的就是epoll IO多路復用模型
3.Kafka Server Reactor模型
Kafka SocketServer是基於Java NIO開發的,采用了Reactor的模式(已被大量實踐證明非常高效,在Netty和Mina中廣泛使用)。Kafka Reactor的模式包含三種角色:
Acceptor;
Processor ;
Handler;
Kafka Reacator包含了1個Acceptor負責接受客戶端請求,N個Processor線程負責讀寫數據(為每個Connection創建出一個Processor去單獨處理,每個Processor中均引用獨立的Selector),M個Handler來處理業務邏輯。在Acceptor和Processor,Processor和Handler之間都有隊列來緩沖請求。
Acceptor的主要職責是監聽客戶端的連接請求,並建立和客戶端的數據傳輸通道,然后為這個客戶端指定一個Processor,它的工作就到此結束,這樣它就可以去響應下一個客戶端的連接請求了;
Processor的主要職責是負責從客戶端讀取數據和將響應返回給客戶端,它本身不處理具體的業務邏輯,每個Processor都有一個Selector,用來監聽多個客戶端,因此可以非阻塞地處理多個客戶端的讀寫請求,Processor將數據放入RequestChannel的RequestQueue 中和從ResponseQueue讀取響應 ;
Handler(kafka.server.KafkaRequestHandler,kafka.server.KafkaApis)的職責是從RequestChannel中的RequestQueue取出Request,處理以后再將Response添加到RequestChannel中的ResponseQueue中;
四、Kafka Partition Leader選舉機制
1.大數據常用的選主機制
Leader選舉算法非常多,大數據領域常用的有 以下兩種:
Zab(zookeeper使用);
Raft;
……
它們都是Paxos算法的變種。
Zab協議有四個階段:
Leader election;
Discovery(或者epoch establish);
Synchronization(或者sync with followers)
Broadcast
比如3個節點選舉leader,編號為1,2,3。1先啟動,選擇自己為leader,然后2啟動首先也選擇自己為 leader,由於1,2都沒過半,選擇編號大的為leader,所以1,2都選擇2為leader,然后3啟動發現1,2已經協商好且數量過半,於是3也選擇2為leader,leader選舉結束。
在Raft中,任何時候一個服務器可以扮演下面角色之一
Leader: 處理所有客戶端交互,日志復制等,一般只有一個Leader;
Follower: 類似選民,完全被動
Candidate候選人: 可以被選為一個新的領導人
啟動時在集群中指定一些機器為Candidate ,然后Candidate開始向其他機器(尤其是Follower)拉票,當某一個Candidate的票數超過半數,它就成為leader。
2.常用選主機制的缺點
由於Kafka集群依賴zookeeper集群,所以最簡單最直觀的方案是,所有Follower都在ZooKeeper上設置一個Watch,一旦Leader宕機,其對應的ephemeral znode會自動刪除,此時所有Follower都嘗試創建該節點,而創建成功者(ZooKeeper保證只有一個能創建成功)即是新的Leader,其它Replica即為Follower。
前面的方案有以下缺點:
split-brain (腦裂): 這是由ZooKeeper的特性引起的,雖然ZooKeeper能保證所有Watch按順序觸發,但並不能保證同一時刻所有Replica“看”到的狀態是一樣的,這就可能造成不同Replica的響應不一致 ;
herd effect (羊群效應): 如果宕機的那個Broker上的Partition比較多,會造成多個Watch被觸發,造成集群內大量的調整;
ZooKeeper負載過重 : 每個Replica都要為此在ZooKeeper上注冊一個Watch,當集群規模增加到幾千個Partition時ZooKeeper負載會過重
3.Kafka Partition選主機制
Kafka 集群controller的選舉過程如下 :
每個Broker都會在Controller Path (/controller)上注冊一個Watch。當前Controller失敗時,對應的Controller Path會自動消失(因為它是ephemeral Node),此時該Watch被fire,所有“活”着的Broker都會去競選成為新的Controller(創建新的Controller Path),但是只會有一個競選成功(這點由Zookeeper保證)。競選成功者即為新的Leader,競選失敗者則重新在新的Controller Path上注冊Watch。因為Zookeeper的Watch是一次性的,被fire一次之后即失效,所以需要重新注冊。
Kafka partition leader的選舉過程如下 (由controller執行):
從Zookeeper中讀取當前分區的所有ISR(in-sync replicas)集合
調用配置的分區選擇算法選擇分區的leader
所以,對於下圖partition 0先選擇broker 2,之后選擇broker 0作為leader;對於partition 1 先選擇broker 0,之后選擇broker 1作為leader;partition 2先選擇broker 1,之后選擇broker 2作為leader。