Kafka 源代碼分析.


這里記錄kafka源代碼筆記.(代碼版本是0.8.2.1)

kafka的源代碼如何下載.這里簡單說一下.

  git clone https://git-wip-us.apache.org/repos/asf/kafka.git kafka

  通過官網給出的鏈接clone項目代碼之后.一般只能得到trunk版本的代碼.想要之前的版本代碼需要自己checkout下來.

  git branch -a 能看見所有的分支.或者用git tag  --list 也可以看見所有的tag標簽.

  想得到其中一個的源代碼的方法就是用git checkout -t xxxx(分支名稱或者標簽名稱).完成之后就能在git branch下看見各個分支了.

  之后使用git checkout 0.8.2就可以切換到對應的版本上去了.

這里簡單說一下源代碼目錄:

  源代碼目錄下的clients目錄下是javaapi的源代碼.

  core目錄下的源代碼是server端的所有實現.包含一下目錄和文件.

admin           #一些管理功能的代碼.topiccommand也在這下面.
api             #跟client通信協議定義的地方
client          #客戶端工具
cluster         #集群的相關實現
common          #常用庫
consumer        #消費端管理的實現
controller      #中央控制器的實現
javaapi         #javaapi的實現
Kafka.scala     #kafka入口庫
log             #log管理的相關實現
message         #消息相關
metrics         #...
network         #...
producer
serializer
server          #server相關的實現.kafkaServer就在這里.
tools
utils

 

這里不再從kafka啟動順序說起.網上已經一堆kafka啟動順序和框架上的文章了.這里不再羅嗦了,主要詳細說一下代碼細節部分.細節部分會一直讀一直補充.如果想看看kafka 框架及啟動順序之類的文章推薦下面這個鏈接.

http://www.cnblogs.com/davidwang456/p/5173486.html

這個鏈接作者貼上了框架圖和代碼.比較清晰.

 

配置文件解析等工作都在KafkaConfig.scala中實現.這個類文件在server目錄下.下文使用的配置文件項都是由這個類提供.這個類繼承ZKConfig類實現.ZKConfig在Utils.scala文件中.功能是將zookeeper相關參數分離出來.

 

1,首先從kafkaScheduler.start()啟動說起.

  KafkaScheduler是同名類的實例對象.類源文件在源代碼目錄的utils目錄下.這個對象的初始化是在KafkaServer 類的開頭部分.實例化參數是config.backgroundThreads.這個參數對應的是配置文件中的background.threads參數.默認是4.

  KafkaScheduler的是封裝了ScheduldThreadPoolExecutor,在這個類的start函數里實例化stpe類.並設置了在執行器關閉的時候不再執行現有和周期性任務.通過setThreadFactory 設置了自己的工廠函數.工廠函數里調用了utils目錄下的Utils類中的newThread方法.做了一些記錄性工作.

  KafkaScheduler類同時實現了scheduler方法.這個方法是封裝了stpe的scheduler的方法.用來傳遞需要執行的方法.

  shutdown函數也是封裝了stpe的shutdown和awaitshutdown方法實現的.

  這個對象通過createlogmanager函數傳遞到LogManager類中.在這個類中的start方法里調用scheduler方法將logmanager的任務放入線程池中.logmanager具體任務是log保留,log刷新,log檢查這三個.這三個任務的詳細情況將在LogManager部分解釋.

2,之后是通過Zkinit()建立了zkclient對象.

  zkinit()函數做的就是通過ZkClient初始化zkclient對象,並且做一些kafka在zookeeper里路徑設置的工作.

  首先檢查config.Zkconnect是否指定了zookeeper中的根路徑.ZKconnect就是配置文件中的zookeeper.connect參數.這個參數可以在ip_list后面加上/xxx/xxx這種指定的zookeeper工作路徑.如:192.168.1.100,127.0.0.1:2181/kafka/dir,代碼中直接使用substr(index('/'))的方式截取chroot.如果未設置則直接使用ZkUtils.setupCommonPaths(zkClient)來創建kafka的所有應用目錄.具體應用目錄可以見ZkUtils.scala文件中.這個文件在utils目錄下.

  不過這個函數代碼有點小疑問,這個疑問先暫時保留.也可能是我未看完全部代碼導致的.

private def initZk(): ZkClient = {
    info("Connecting to zookeeper on " + config.zkConnect)
//一般配置zookeeper.connect選項的時候很少會在后面跟路徑.但是當跟路徑之后chroot就會是追加的路徑.
    val chroot = {
      if (config.zkConnect.indexOf("/") > 0)
        config.zkConnect.substring(config.zkConnect.indexOf("/"))
      else
        ""
    }
//chroot是追加路徑的時候.會執行下面這段代碼.
    if (chroot.length > 1) {  //疑問在這個地方.
      val zkConnForChrootCreation = config.zkConnect.substring(0, config.zkConnect.indexOf("/"))//這是取出zookeeper ip list
      val zkClientForChrootCreation = new ZkClient(zkConnForChrootCreation, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer)//然后建立zkclient對象.
      ZkUtils.makeSurePersistentPathExists(zkClientForChrootCreation, chroot)//之后通過這個方法創建這個路徑.
      info("Created zookeeper path " + chroot)//打印一個create info
      zkClientForChrootCreation.close()//這里清理這個追加路徑的zkclient對象.
    }
//做完這些事之后.又正常的用zkconnect來建立一個zkClient對象........如果chroot.length>1的話.這個時候zkconnect應該是跟if塊里的一樣需要提取ip list才對.
//如果一開始就沒有追加路徑的話.這里是沒有任何問題的. val zkClient
= new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer) ZkUtils.setupCommonPaths(zkClient) //這個函數是調用ZkUtils.makeSurePersistentPathExists函數來建立kafka路徑.
zkClient }

def setupCommonPaths(zkClient: ZkClient) { //可以看見這個函數是如何工作的
  for(path <- Seq(ConsumersPath, BrokerIdsPath, BrokerTopicsPath, TopicConfigChangesPath, TopicConfigPath, DeleteTopicsPath))
  makeSurePersistentPathExists(zkClient, path) //以上這些路徑是提前定義好的.定義如下.
}

 
         

val ConsumersPath = "/consumers"  //這些路徑是在ZkUtils類開頭定義的.可以看見這個定義里無論你是否追加了chroot.也沒有任何影響.
val BrokerIdsPath = "/brokers/ids"
val BrokerTopicsPath = "/brokers/topics"
val TopicConfigPath = "/config/topics"
val TopicConfigChangesPath = "/config/changes"
val ControllerPath = "/controller"
val ControllerEpochPath = "/controller_epoch"
val ReassignPartitionsPath = "/admin/reassign_partitions"
val DeleteTopicsPath = "/admin/delete_topics"
val PreferredReplicaLeaderElectionPath = "/admin/preferred_replica_election"

 

   因為是版本0.8.2.1的代碼所以我又去對照trunk版本的代碼看了一下.這個函數變了.不是直接調用zkclient實現是調用zkutils實現得了.

 

后續的日志管理,socket服務,副本管理,中央控制等等分析都會在后續篇章里繼續分析.

 

  

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM