1.概述
在對Kafka使用層面掌握后,進一步提升分析其源碼是極有必要的。縱觀Kafka源碼工程結構,不算太復雜,代碼量也不算大。分析研究其實現細節難度不算太大。今天筆者給大家分析的是其核心處理模塊,core模塊。
2.內容
首先,我們需要對Kafka的工程結構有一個整體的認知度,Kafka 大家最為熟悉的就是其消費者與生產者。然其,底層的存儲機制,選舉機制,備份機制等實現細節,需要我們對其源碼仔細閱讀學習,思考與分析其設計之初的初衷。下面,我們首先來看看Kafka源碼工程模塊分布,截止當天日期,官方托管在 Github 上的 Kafka 源碼版本為:0.10.2.1,其工程分布結構如下圖所示:
這里筆記只針對core模塊進行說明,其他模塊均是啟動腳本,文檔說明,測試類或是Java客戶端的相關代碼,本篇博客就不多做贅述了。
模塊名 | 說明 |
admin | kafka的管理員模塊,操作和管理其topic,partition相關,包含創建,刪除topic,或者拓展分區等。 |
api | 主要負責數據交互,客戶端與服務端交互數據的編碼與解碼。 |
client | 該模塊下就一個類,producer讀取kafka broker元數據信息,topic和分區,以及leader。 |
cluster | 這里包含多個實體類,有Broker,Cluster,Partition,Replica。其中一個Cluster由多個Broker組成,一個Broker包含多個Partition,一個Topic的所有Partition分布在不同的Broker中,一個Partition包含多個Replica。 |
common | 這是一個通用模塊,其只包含各種異常類以及錯誤驗證。 |
consumer | 消費者處理模塊,負責所有的客戶端消費者數據和邏輯處理。 |
controller | 此模塊負責中央控制器的選舉,分區的Leader選舉,Replica的分配或其重新分配,分區和副本的擴容等。 |
coordinator | 負責管理部分consumer group和他們的offset。 |
javaapi | 提供Java語言的producer和consumer的API接口。 |
log | 這是一個負責Kafka文件存儲模塊,負責讀寫所有的Kafka的Topic消息數據。 |
message | 封裝多條數據組成一個數據集或者壓縮數據集。 |
metrics | 負責內部狀態的監控模塊。 |
network | 該模塊負責處理和接收客戶端連接,處理網絡時間模塊。 |
producer | 生產者的細節實現模塊,包括的內容有同步和異步的消息發送。 |
security | 負責Kafka的安全驗證和管理模塊。 |
serializer | 序列化和反序列化當前消息內容。 |
server | 該模塊涉及的內容較多,有Leader和Offset的checkpoint,動態配置,延時創建和刪除Topic,Leader的選舉,Admin和Replica的管理,以及各種元數據的緩存等內容。 |
tools | 閱讀該模塊,就是一個工具模塊,涉及的內容也比較多。有導出對應consumer的offset值;導出LogSegments信息,以及當前Topic的log寫的Location信息;導出Zookeeper上的offset值等內容。 |
utils | 各種工具類,比如Json,ZkUtils,線程池工具類,KafkaScheduler公共調度器類,Mx4jLoader監控加載器,ReplicationUtils復制集工具類,CommandLineUtils命令行工具類,以及公共日志類等內容。 |
3.源碼環境
閱讀Kafka源碼需要准備以下環境:
- JDK
- IDE(Eclipse,IDEA或者其他)
- gradle
關於環境的搭建,大家可以利用搜索引擎去完成,比較基礎,這里就不多贅述了。然后在源碼工程目錄下執行以下命令:
- gradle idea(編輯器為IDEA)
- gradle eclipse(編輯器為Eclipse)
如何選擇,可按照自己所使用的編輯器即可。這里筆者所使用的是IDEA,執行命令后,會在源碼目錄生成以下文件,如下圖所示:
然后,在編輯器中導入該源碼項目工程即可,如下圖所示:
4.運行源碼
這里,我們先在config模塊下設置server.properties文件,按照自己的需要設置,比如分區數,log的存儲路徑,zookeeper的地址設置等等。然后,我們在編輯器中的運行中設置相關的啟動參數,如下圖所示:
啟動類Kafka.scala在core模塊下,需要注意的是,這里在啟動Kafka之前,確保我們之前在server.properties文件中所配置的Zookeeper集群已正常運行,然后我們在編輯器中運行Kafka源碼,如下圖所示:
5.預覽結果
這里,我們做一下簡單的修改,在啟動類的開頭打印一句啟動日志和啟動時間,部分運行日志和運行結果截圖如下所示:
Start Kafka,DateTime[1494065094606] [2017-05-06 18:04:54,830] INFO KafkaConfig values: advertised.host.name = null advertised.listeners = null advertised.port = null authorizer.class.name = auto.create.topics.enable = true auto.leader.rebalance.enable = true background.threads = 10 broker.id = 0 broker.id.generation.enable = true broker.rack = null compression.type = producer connections.max.idle.ms = 600000 controlled.shutdown.enable = true
如上圖,紅色框即是我們簡單的添加的一句代碼。
編譯源代碼:
./gradlew releaseTarGz -x signArchives
6.總結
本篇博客給大家介紹了Kafka源碼的core模塊下各個子模塊所負責的內容,以及如何便捷的去閱讀源碼,以及在編輯器中運行Kafka源碼。后續,再為大家分析Kafka的存儲機制,選舉機制,備份機制等內容的實現細節。最后,歡迎大家使用Kafka-Eagle監控工具。
7.結束語
這篇博客就和大家分享到這里,如果大家在研究學習的過程當中有什么問題,可以加群進行討論或發送郵件給我,我會盡我所能為您解答,與君共勉!