Zookeeper中Kafka相關信息的存儲


zookeeper 是應用廣泛的分布式服務協調組件,它對於大數據領域的其他組件,如HFDS、Yarn、Hbase、Kafka等,都扮演着基礎角色

在kafka.utils.ZKUtils對象的開頭,預先定義了很多ZK路徑,如:

object ZkUtils extends scala.AnyRef {
  val AdminPath : java.lang.String = { /* compiled code */ }
  val BrokersPath : java.lang.String = { /* compiled code */ }
  val ClusterPath : java.lang.String = { /* compiled code */ }
  val ConfigPath : java.lang.String = { /* compiled code */ }
  val ControllerPath : java.lang.String = { /* compiled code */ }
  val ControllerEpochPath : java.lang.String = { /* compiled code */ }
  val IsrChangeNotificationPath : java.lang.String = { /* compiled code */ }
  val LogDirEventNotificationPath : java.lang.String = { /* compiled code */ }
  val KafkaAclPath : java.lang.String = { /* compiled code */ }
  val KafkaAclChangesPath : java.lang.String = { /* compiled code */ }
  val ConsumersPath : java.lang.String = { /* compiled code */ }
  val ClusterIdPath : scala.Predef.String = { /* compiled code */ }
  val BrokerIdsPath : scala.Predef.String = { /* compiled code */ }
  val BrokerTopicsPath : scala.Predef.String = { /* compiled code */ }
  val ReassignPartitionsPath : scala.Predef.String = { /* compiled code */ }
  val DeleteTopicsPath : scala.Predef.String = { /* compiled code */ }
  val PreferredReplicaLeaderElectionPath : scala.Predef.String = { /* compiled code */ }
  val BrokerSequenceIdPath : scala.Predef.String = { /* compiled code */ }
  val ConfigChangesPath : scala.Predef.String = { /* compiled code */ }
  val ConfigUsersPath : scala.Predef.String = { /* compiled code */ }
  val ConfigBrokersPath : scala.Predef.String = { /* compiled code */ }
  val ProducerIdBlockPath : java.lang.String = { /* compiled code */ }
  val SecureZkRootPaths : scala.collection.Seq[java.lang.String] = { /* compiled code */ }
  val SensitiveZkRootPaths : scala.collection.Seq[scala.Predef.String] = { /* compiled code */ }
........
}

 

可以通過ZK命令行或可視化工具來觀察這些路徑下面的存儲情況

1、Broker注冊信息

路徑為/brokers/ids/[broker_id],其中存儲的數據示例如下。

{
  "listener_security_protocol_map": {
    "PLAINTEXT": "PLAINTEXT"
  },
  "endpoints": ["PLAINTEXT://hadoop100:9092"],
  "jmx_port": 9393,
  "host": "hadoop100",
  "timestamp": "1554349917296",
  "port": 9092,
  "version": 4
}

 

  • jmx_port:JMX端口號。
  • host:所在主機名或IP地址。
  • timestamp:啟動時的時間戳。
  • port:開放的TCP端口號。
  • version:版本號。以下所有version值均是代表版本號,不再贅述。

當Kafka集群中有節點上下線時,這個路徑下的數據就會更新。

2、Topic信息

路徑為/brokers/topics/[topic_name], 其中存儲的數據示例如下。

{
  "version": 1,
  "partitions": {
    "1": [106],
    "0": [105],
    "2": [107],
  }
}
  • partitions:topic中各個partition的ID,以及對應的ISR中各個broker的ID的列表

當有topic被創建或刪除,以及partition發生變更時,

通過對topic以及節點變更注冊監聽,就能實現producer的負載均衡

在/admin/delete_topics下還保存有已經標記為刪除的topic名稱(只有名稱,沒有其他數據)

/config/topics/[topic_name]下保存有各個topic的自定義配置

partition狀態信息路徑/brokers/topics/[topic_name]/partitions/[partition_id]/state,其中存儲的數據如下:

{
  "controller_epoch": 17,
  "leader": 105,
  "version": 1,
  "leader_epoch": 2,
  "isr": [105]
}
  • controller_epoch:controller的紀元(代數),即集群重新選舉controller的次數
  • leader:當前partition的leader的broker ID
  • leader_epoch:partition leader的紀元(代數),即當前partition重新選舉leader的次數
  • isr:該partition對應的ISR中各個broker ID的列表

3、Controller注冊信息

當前controller信息的路徑就是/controller,其中存儲的數據示例如下。

{
  "version": 1,
  "brokerid": 104,
  "timestamp": "1554349916898"
}
  • brokerid:現在集群中Controller的節點ID
  • timestamp:最近一次Controller變化的時間戳

如果Controller信息節點被刪除,就會觸發集群重新選舉Controller。zk對選主操作有天然的支持

在在/controller_epoch路徑下還保存有controller的紀元值,與partition狀態信息中的值相同。沒重選舉一次,該值就會加1

4、consumer訂閱信息

consumer本身的信息路徑為/consumers/[group_id]/ids/[consumer_id],其中存儲的數據示例如下。

{
  "version": 1,
  "subscription": {
    "bl_mall_orders": 1
  },
  "pattern": "white_list",
  "timestamp": "1558617131642"
}
  • subscription:訂閱topic名稱,及該topic對應消息流個數的映射
  • parttern:訂閱方式,可取值靜態(static)、白名單(white_list)、黑名單(black_list)
  • timestamp:consumer創建時的時間戳

通過zk維護的consumer及consumer group信息,可以實現消費者負載均衡

/consumers/[group_id]/offsets/[topic_name]/[partition_id]下存儲有consumer group對應各個topic及paritition的消費偏移量

/consumers/[group_id]/owners/[topic_name]/[partition_id]下存儲有consumer group對應各個topic及partition的消費者線程。

5、最優replica選舉信息

當由於節點宕機等原因使得partition leader變得不再均勻分布時,可以使用kafka提供的kafka-preferred-replica-election工具重新將partition創建時的最優replica(前提是在ISR內)選舉為leader

也可以開啟leader自動平衡的功能(auto.leader.rebalance.enable

當正在選舉最優replica時,zk中會創建/admin/preferred_replica_election節點,其中存儲着需要調整最優replica的partition信息,示例數據如下。

{
  "version": 1,
  "partitions": [
    {
      "topic": "bl_mall_orders",
      "partition": 1
    },
    {
      "topic": "bl_mall_products",
      "partition": 0
    }
  ]
}

6、paritition重分配信息

與上面的kafka-preferred-replica-election工具類似,Kafka還提供了kafka-reassign-partitions工具,但它的功能更為強大。

它重新分配partition的所有leader和follower的位置,甚至更改replica數量。

當集群擴容或follower分布也不均勻時,就可以利用它。

該工具會生成JSON格式的重分配計划,並存入zk中/admin/reassign_partitions節點,示例數據如下。

{
  "version": 1,
  "partitions": [
    {
      "topic": "bl_mall_wish",
      "partition": 1,
      "replicas": [0, 1, 3]
    }
  ]
}

7、ISR變更通知信息

各個paritition的ISR集合並不是一成不變的。

當ISR發生變化(如replica超時),Controller會將發生變得哪個partition存入/isr_change_notification/[isr_change_x]


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM