zookeeper 是應用廣泛的分布式服務協調組件,它對於大數據領域的其他組件,如HFDS、Yarn、Hbase、Kafka等,都扮演着基礎角色
在kafka.utils.ZKUtils對象的開頭,預先定義了很多ZK路徑,如:
object ZkUtils extends scala.AnyRef { val AdminPath : java.lang.String = { /* compiled code */ } val BrokersPath : java.lang.String = { /* compiled code */ } val ClusterPath : java.lang.String = { /* compiled code */ } val ConfigPath : java.lang.String = { /* compiled code */ } val ControllerPath : java.lang.String = { /* compiled code */ } val ControllerEpochPath : java.lang.String = { /* compiled code */ } val IsrChangeNotificationPath : java.lang.String = { /* compiled code */ } val LogDirEventNotificationPath : java.lang.String = { /* compiled code */ } val KafkaAclPath : java.lang.String = { /* compiled code */ } val KafkaAclChangesPath : java.lang.String = { /* compiled code */ } val ConsumersPath : java.lang.String = { /* compiled code */ } val ClusterIdPath : scala.Predef.String = { /* compiled code */ } val BrokerIdsPath : scala.Predef.String = { /* compiled code */ } val BrokerTopicsPath : scala.Predef.String = { /* compiled code */ } val ReassignPartitionsPath : scala.Predef.String = { /* compiled code */ } val DeleteTopicsPath : scala.Predef.String = { /* compiled code */ } val PreferredReplicaLeaderElectionPath : scala.Predef.String = { /* compiled code */ } val BrokerSequenceIdPath : scala.Predef.String = { /* compiled code */ } val ConfigChangesPath : scala.Predef.String = { /* compiled code */ } val ConfigUsersPath : scala.Predef.String = { /* compiled code */ } val ConfigBrokersPath : scala.Predef.String = { /* compiled code */ } val ProducerIdBlockPath : java.lang.String = { /* compiled code */ } val SecureZkRootPaths : scala.collection.Seq[java.lang.String] = { /* compiled code */ } val SensitiveZkRootPaths : scala.collection.Seq[scala.Predef.String] = { /* compiled code */ } ........ }
可以通過ZK命令行或可視化工具來觀察這些路徑下面的存儲情況
1、Broker注冊信息
路徑為/brokers/ids/[broker_id]
,其中存儲的數據示例如下。
{ "listener_security_protocol_map": { "PLAINTEXT": "PLAINTEXT" }, "endpoints": ["PLAINTEXT://hadoop100:9092"], "jmx_port": 9393, "host": "hadoop100", "timestamp": "1554349917296", "port": 9092, "version": 4 }
- jmx_port:JMX端口號。
- host:所在主機名或IP地址。
- timestamp:啟動時的時間戳。
- port:開放的TCP端口號。
- version:版本號。以下所有version值均是代表版本號,不再贅述。
當Kafka集群中有節點上下線時,這個路徑下的數據就會更新。
2、Topic信息
路徑為/brokers/topics/[topic_name]
, 其中存儲的數據示例如下。
{ "version": 1, "partitions": { "1": [106], "0": [105], "2": [107], } }
- partitions:topic中各個partition的ID,以及對應的ISR中各個broker的ID的列表
當有topic被創建或刪除,以及partition發生變更時,
通過對topic以及節點變更注冊監聽,就能實現producer的負載均衡
在/admin/delete_topics下還保存有已經標記為刪除的topic名稱(只有名稱,沒有其他數據)
在/config/topics/[topic_name]
下保存有各個topic的自定義配置
partition狀態信息路徑/brokers/topics/[topic_name]/partitions/[partition_id]/state
,其中存儲的數據如下:
{ "controller_epoch": 17, "leader": 105, "version": 1, "leader_epoch": 2, "isr": [105] }
- controller_epoch:controller的紀元(代數),即集群重新選舉controller的次數
- leader:當前partition的leader的broker ID
- leader_epoch:partition leader的紀元(代數),即當前partition重新選舉leader的次數
- isr:該partition對應的ISR中各個broker ID的列表
3、Controller注冊信息
當前controller信息的路徑就是/controller
,其中存儲的數據示例如下。
{ "version": 1, "brokerid": 104, "timestamp": "1554349916898" }
- brokerid:現在集群中Controller的節點ID
- timestamp:最近一次Controller變化的時間戳
如果Controller信息節點被刪除,就會觸發集群重新選舉Controller。zk對選主操作有天然的支持
在在/controller_epoch
路徑下還保存有controller的紀元值,與partition狀態信息中的值相同。沒重選舉一次,該值就會加1
4、consumer訂閱信息
consumer本身的信息路徑為/consumers/[group_id]/ids/[consumer_id]
,其中存儲的數據示例如下。
{ "version": 1, "subscription": { "bl_mall_orders": 1 }, "pattern": "white_list", "timestamp": "1558617131642" }
- subscription:訂閱topic名稱,及該topic對應消息流個數的映射
- parttern:訂閱方式,可取值靜態(static)、白名單(white_list)、黑名單(black_list)
- timestamp:consumer創建時的時間戳
通過zk維護的consumer及consumer group信息,可以實現消費者負載均衡
在/consumers/[group_id]/offsets/[topic_name]/[partition_id]
下存儲有consumer group對應各個topic及paritition的消費偏移量
在/consumers/[group_id]/owners/[topic_name]/[partition_id]
下存儲有consumer group對應各個topic及partition的消費者線程。
5、最優replica選舉信息
當由於節點宕機等原因使得partition leader變得不再均勻分布時,可以使用kafka提供的kafka-preferred-replica-election
工具重新將partition創建時的最優replica(前提是在ISR內)選舉為leader
也可以開啟leader自動平衡的功能(auto.leader.rebalance.enable
)
當正在選舉最優replica時,zk中會創建/admin/preferred_replica_election
節點,其中存儲着需要調整最優replica的partition信息,示例數據如下。
{ "version": 1, "partitions": [ { "topic": "bl_mall_orders", "partition": 1 }, { "topic": "bl_mall_products", "partition": 0 } ] }
6、paritition重分配信息
與上面的kafka-preferred-replica-election
工具類似,Kafka還提供了kafka-reassign-partitions
工具,但它的功能更為強大。
它重新分配partition的所有leader和follower的位置,甚至更改replica數量。
當集群擴容或follower分布也不均勻時,就可以利用它。
該工具會生成JSON格式的重分配計划,並存入zk中/admin/reassign_partitions
節點,示例數據如下。
{ "version": 1, "partitions": [ { "topic": "bl_mall_wish", "partition": 1, "replicas": [0, 1, 3] } ] }
7、ISR變更通知信息
各個paritition的ISR集合並不是一成不變的。
當ISR發生變化(如replica超時),Controller會將發生變得哪個partition存入/isr_change_notification/[isr_change_x]
中