Kafka 存儲機制和副本


1.概述

  Kafka 快速穩定的發展,得到越來越多開發者和使用者的青睞。它的流行得益於它底層的設計和操作簡單,存儲系統高效,以及充分利用磁盤順序讀寫等特性,和其實時在線的業務場景。對於Kafka來說,它是一個分布式的,可分區的,多副本,多訂閱者的,基於Zookeeper統一協調的分布式日志系統。常見的可以用於系統日志,業務日志,消息數據等。那今天筆者給大家分析Kafka的存儲機制和副本的相關內容。

2.Replication

  Replication是Kafka的重要特性之一,針對其Kafka Brokers進行自動調優Replication數,是比較有難度的。原因之一在於要知道怎么避免Follower進入和退出同步 ISR (In-Sync Replicas)。再消息生產的過程當中,在有一大批海量數據寫入時,可能會引發Broker告警。如果某些Topic的部分Partition長期處於 “under replicated”,這樣是會增加丟失數據的幾率的。Kafka 通過多副本機制實現高可用,確保當Kafka集群中某一個Broker宕機的情況下,仍然可用。而 Kafka 的復制算法保證,如果Leader發生故障或者宕機,一個新的Leader會被重新選舉出來,並對外提供服務,供客戶端寫入消息。Kafka 在同步的副本列表中選舉一個副本為Leader。

  在Topic中,每個分區有一個預寫式日志文件,每個分區都由一系列有序,不可變的消息組成,這些消息被連續的追加到分區中,分區中的每個消息都包含一個連續的序列號,即:offset。它用於確定在分區中的唯一位置。如下圖所示:

  在Kafka中,假如每個Topic的分區有N個副本,由於Kafka通過多副本機制實現故障自動轉移,這里需要說明的是,當KafkaController出現故障,進而不能繼續管理集群,則那些KafkaController Follower開始競選新的Leader,而啟動的過程則是在KafkaController的startup方法中完成的,如下所示:

 def startup() = {
    inLock(controllerContext.controllerLock) {
      info("Controller starting up")
      registerSessionExpirationListener()
      isRunning = true
      controllerElector.startup
      info("Controller startup complete")
    }
  }

  然后啟動ZookeeperLeaderElector,在創建臨時節點,進行session檢查,更新leaderId等操作完成后,會調用故障轉移函數onBecomingLeader,也就是KafkaController中的onControllerFailover方法,如下所示:

def onControllerFailover() {
    if(isRunning) {
      info("Broker %d starting become controller state transition".format(config.brokerId))
      readControllerEpochFromZookeeper()
      incrementControllerEpoch(zkUtils.zkClient)

      // before reading source of truth from zookeeper, register the listeners to get broker/topic callbacks
      registerReassignedPartitionsListener()
      registerIsrChangeNotificationListener()
      registerPreferredReplicaElectionListener()
      partitionStateMachine.registerListeners()
      replicaStateMachine.registerListeners()

      initializeControllerContext()

      // We need to send UpdateMetadataRequest after the controller context is initialized and before the state machines
      // are started. The is because brokers need to receive the list of live brokers from UpdateMetadataRequest before
      // they can process the LeaderAndIsrRequests that are generated by replicaStateMachine.startup() and
      // partitionStateMachine.startup().
      sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)

      replicaStateMachine.startup()
      partitionStateMachine.startup()

      // register the partition change listeners for all existing topics on failover
      controllerContext.allTopics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic))
      info("Broker %d is ready to serve as the new controller with epoch %d".format(config.brokerId, epoch))
      maybeTriggerPartitionReassignment()
      maybeTriggerPreferredReplicaElection()
      if (config.autoLeaderRebalanceEnable) {
        info("starting the partition rebalance scheduler")
        autoRebalanceScheduler.startup()
        autoRebalanceScheduler.schedule("partition-rebalance-thread", checkAndTriggerPartitionRebalance,
          5, config.leaderImbalanceCheckIntervalSeconds.toLong, TimeUnit.SECONDS)
      }
      deleteTopicManager.start()
    }
    else
      info("Controller has been shut down, aborting startup/failover")
  }

  正因為有這樣的機制存在,所示當Kafka集群中的某個Broker宕機后,仍然保證服務是可用的。在Kafka中發生復制操作時,確保分區的預寫式日志有序的寫到其他節點,在N個復制因子中,其中一個復制因子角色為Leader,那么其他復制因子的角色則為Follower,Leader處理分區的所有讀寫請求,同時,Follower會被動的定期去復制Leader上的數據。以上分析可以總結為以下幾點,如下所示:

  • Leader負責處理分區的所有讀寫請求。
  • Follower會復制Leader上數據。
  • Kafka 的故障自動轉移確保服務的高可用。

3.存儲

  對於消息對應的性能評估,其文件存儲機制設計是衡量的關鍵指標之一,在分析Kafka的存儲機制之前,我們先了解Kafka的一些概念:

  • Broker:Kafka消息中間件節點,一個節點代表一個Broker,多個Broker可以組建成Kafka Brokers,即:Kafka集群。
  • Topic:消息存儲主題,即可以理解為業務數據名,Kafka Brokers能夠同時負責多個Topic的處理。
  • Partition:針對於Topic來說的,一個Topic上可以有多個Partition,每個Partition上的數據是有序的。
  • Segment:對於Partition更小粒度,一個Partition由多個Segment組成。
  • Offset:每個Partition上都由一系列有序的,不可變的消息組成,這些消息被連續追加到Partition中。而在其中有一個連續的序列號offset,用於標識消息的唯一性。

3.1 Topic存儲

  在Kafka文件存儲中,同一個Topic下有多個不同的Partition,每個Partition為一個單獨的目錄,Partition的命名規則為:Topic名稱+有序序號,第一個Partition序號從0開始,序號最大值等於Partition的數量減1,如下圖所示:

3.2 分區文件存儲

  每個分區相當於一個超大的文件被均分到多個大小相等的Segment數據文件中,但是每個Segment消息數量不一定相等,正因為這種特性的存在,方便了Old Segment File快速被刪除。而對於每個分區只需要支持順序讀寫即可,Segment文件生命周期由服務端配置的參數決定。這樣即可快速刪除無用數據文件,有效提高磁盤利用率。

3.3 Segment文件存儲

  這里,Segment文件由Index File和Data File組成,文件是一一對應的,后綴為 .index 表示索引文件, .log 表示數據文件,如下圖所示:

  如上圖所示,Segment文件命名規則由分區全局第一個Segment從0開始,后續每一個Segment文件名為上一個Segment文件最后一個消息的Offset值。這里Segment數據文件由許多消息組成,消息物理結構如下所示:

Key Describer
offset 用於標識每個分區中每條消息的唯一性,Offset的數值標識該分區的第幾條消息
message Size 消息大小
CRC32 用CRC32校驗消息
“magic” 當前發布Kafka服務程序的協議版本號
“attribute” 獨立版本,或標識壓縮類型,或者編碼類型
key length key的長度
key 可選
payload length 實際消息數據

3.4 分區中查找消息

  在分區中,可以通過offset偏移量來查找消息,如上圖中,文件00000000000046885905.index的消息起始偏移量為46885906=46885905+1,其他文件依此類推,以起始偏移量命名並排序這些文件,這樣能夠快速的定位到具體的文件。通過segment file,當offset為46885906時,我們可以定位到00000000000046885905.index元數據物理位置和00000000000046885905.log物理偏移地址。

4.總結

  通過對副本和存儲機制的分析,我們可以清楚的知道,Kafka通過自動故障轉移來確保服務的高可用,Leader負責分區的所有讀寫操作,Follower會復制Leader上的數據。Kafka針對Topic,使某一個分區中的大文件分成多個小文件,通過多個小的segment file,使之便捷定期清理或刪除已經消費的文件,減少磁盤占用。另外,通過索引文件稀疏存儲,可以大幅度降低索引文件元數據所占用的空間。

5.結束語

  這篇博客就和大家分享到這里,如果大家在研究學習的過程當中有什么問題,可以加群進行討論或發送郵件給我,我會盡我所能為您解答,與君共勉。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM