一、前言
上一篇博文講解了Zookeeper的典型應用場景,在大數據時代,各種分布式系統層出不窮,其中,有很多系統都直接或間接使用了Zookeeper,用來解決諸如配置管理、分布式通知/協調、集群管理和Master選舉等一系列分布式問題。
二、 Hadoop
Hadoop的核心是HDFS(Hadoop Distributed File System)和MapReduce,分別提供了對海量數據的存儲和計算能力,后來,Hadoop又引入了全新MapReduce框架YARN(Yet Another Resource Negotiator)。在Hadoop中,Zookeeper主要用於實現HA(High Availability),這部分邏輯主要集中在Hadoop Common的HA模塊中,HDFS的NameNode與YARN的ResourceManager都是基於此HA模塊來實現自己的HA功能,YARN又使用了Zookeeper來存儲應用的運行狀態。
YARN
YARN是一種新的 Hadoop 資源管理器,它是一個通用資源管理系統,可為上層應用提供統一的資源管理和調度,它的引入為集群在利用率、資源統一管理和數據共享等方面帶來了巨大好處。其可以支持MapReduce模型,同時也支持Tez、Spark、Storm、Impala、Open MPI等。
YARN主要由ResourceManager(RM)、NodeManager(NM)、ApplicationManager(AM)、Container四部分構成。其中,ResourceManager為全局資源管理器,負責整個系統的資源管理和分配。由YARN體系架構可以看到ResourceManager的單點問題,ResourceManager的工作狀況直接決定了整個YARN架構是否可以正常運轉。
ResourceManager HA
為了解決ResourceManager的單點問題,YARN設計了一套Active/Standby模式的ResourceManager HA架構。
由上圖可知,在運行期間,會有多個ResourceManager並存,並且其中只有一個ResourceManager處於Active狀態,另外一些(允許一個或者多個)則處於Standby狀態,當Active節點無法正常工作時,其余處於Standby狀態的節點則會通過競爭選舉產生新的Active節點。
主備切換
ResourceManager使用基於Zookeeper實現的ActiveStandbyElector組件來確定ResourceManager的狀態。具體步驟如下
1. 創建鎖節點。在Zookeeper上會有一個類似於/yarn-leader-election/pseudo-yarn-rm-cluster的鎖節點,所有的ResourceManager在啟動時,都會去競爭寫一個Lock子節點(/yarn-leader-election/pseudo-yarn-rm-cluster/ActiveStandbyElectorLock),子節點類型為臨時節點,利用Zookeeper的特性,創建成功的那個ResourceManager切換為Active狀態,其余的為Standby狀態。
2. 注冊Watcher監聽。所有Standby狀態的ResourceManager都會向/yarn-leader-election/pseudo-yarn-rm-cluster/ActiveStandbyElectorLock節點注冊一個節點變更監聽,利用臨時節點的特性,能夠快速感知到Active狀態的ResourceManager的運行情況。
3. 主備切換。當Active的ResourceManager無法正常工作時,其創建的Lock節點也會被刪除,此時,其余各個Standby的ResourceManager都會收到通知,然后重復步驟1。
隔離(Fencing)
在分布式環境中,經常會出現諸如單機假死(機器由於網絡閃斷或是其自身由於負載過高,常見的有GC占用時間過長或CPU負載過高,而無法正常地對外進行及時響應)情況。假設RM集群由RM1和RM2兩台機器構成,某一時刻,RM1發生了假死,此時,Zookeeper認為RM1掛了,然后進行主備切換,RM2會成為Active狀態,但是在隨后,RM1恢復了正常,其依然認為自己還處於Active狀態,這就是分布式腦裂現象,即存在多個處於Active狀態的RM工作,可以使用隔離來解決此類問題。
YARN引入了Fencing機制,借助Zookeeper的數據節點的ACL權限控制機制來實現不同RM之間的隔離。在上述主備切換時,多個RM之間通過競爭創建鎖節點來實現主備狀態的確定,此時,只需要在創建節點時攜帶Zookeeper的ACL信息,目的是為了獨占該節點,以防止其他RM對該節點進行更新。
還是上述案例,若RM1出現假死,Zookeeper會移除其創建的節點,此時RM2會創建相應的鎖節點並切換至Active狀態,RM1恢復之后,會試圖去更新Zookeeper相關數據,但是此時其沒有權限更新Zookeeper的相關節點數據,因為節點不是由其創建的,於是就自動切換至Standby狀態,這樣就避免了腦裂現象的出現。
ResourceManager狀態存儲
在ResourceManager中,RMStateStore可以存儲一些RM的內部狀態信息,包括Application以及Attempts信息、Delegation Token及Version Information等,值得注意的是,RMStateStore的絕大多數狀態信息都是不需要持久化存儲的(如資源使用情況),因為其很容易從上下文信息中重構,,在存儲方案設計中,提供了三種可能的實現。
1. 基於內存實現,一般用於日常開發測試。
2. 基於文件系統實現,如HDFS。
3. 基於Zookeeper實現。
由於存儲的信息不是特別大,Hadoop官方建議基於Zookeeper來實現狀態信息的存儲,在Zookeeper中,ResourceManager的狀態信息都被存儲在/rmstore這個根節點下,其數據結構如下。
在RMAppRoot節點下存儲的是與各個Application相關的信息,RMDTSecretManagerRoot存儲的是與安全相關的Token信息。每個Active狀態的ResourceManager在初始化節點都會從Zookeeper上讀取到這些信息,並根據這些狀態信息繼續后續的處理。
三、HBase
HBase(Hadoop Database)是一個基於Hadoop的文件系統設計的面向海量數據的高可靠、高性能、面向列、可伸縮的分布式存儲系統,其針對數據寫入具有強一致性,索引列也實現了強一致性,其采用了Zookeeper服務來完成對整個系統的分布式協調工作,其架構如下
HBase架構中,Zookeeper是串聯起HBase集群與Client的關鍵所在,Zookeeper在HBase中的系統容錯、RootRegion管理、Region狀態管理、分布式SplitLog任務管理、Replication管理都扮演重要角色。
系統容錯
在HBase啟動時,每個RegionServer服務器都會到Zookeeper的/hbase/rs節點下創建一個信息節點,例如/hbase/rs/[Hostname],同時,HMaster會對這個節點進行監聽,當某個RegionServer掛掉時,Zookeeper會因為在一段時間內無法接收其心跳信息(Session失效),而刪除掉該RegionServer服務器對應的節點,與此同時,HMaster則會接收到Zookeeper的NodeDelete通知,從而感知到某個節點斷開,並立即開始容錯工作,HMaster會將該RegionServer所處理的數據分片(Region)重新路由到其他節點上,並記錄到Meta信息中供客戶端查詢。
RootRegion管理
數據存儲的位置信息是記錄在元數據分片上的,即在RootRegion上,每次客戶端發起新的請求,就會查詢RootRegion來確定數據的位置,而RootRegion自身的位置則記錄在Zookeeper上(默認情況下在/hbase/root-region-server節點中)。當RootRegion發生變化時,如Region手工移動、Balance或者是RootRegion所在服務器發生了故障,就能通過Zookeeper來感知到這一變化並做出一系列相應的容災措施,從而保障客戶端總能夠拿到正確的RootRegion信息。
Region狀態管理
Region是HBase中數據的物理切片,每個Region記錄了全局數據的一小部分,並且不同的Region之間的數據是相互不重復的。但對於一個分布式系統來說,Region是會發生變更的,原因可能是系統故障、負載均衡、配置修改、Region分裂合並等,一旦Region發生變動,它必然經歷離線和重新在線的過程。在離線期間,數據是不能被訪問的,並且Region的狀態變化必須讓全局知曉,否則可能會出現某些事務性的異常,而對於HBase集群而言,Region的數量可能會多達10萬級別,因此這樣規模的Region狀態管理只有依靠Zookeeper才能做到。
分布式SplitLog任務管理
當某台RegionServer服務器掛掉后,由於總有一部分新寫入的數據還沒有持久化到HFile中(在內存中),因此在遷移該RegionServer的服務時,應該從HLog中恢復這部分還在內存中的數據,此時HMaster需要遍歷該RegionServer服務器的HLog(SplitLog),並按照Region切分成小塊移動到新的地址,並進行數據的Replay。由於單個RegionServer的日志量相對龐大(可能存在上千個Region,上GB的日志),而用戶往往希望系統能夠快速完成日志的恢復工作,因此,需要將處理HLog的任務分配給多台RegionServer服務器共同處理,而這又需要一個持久化組件來輔助HMaster完成任務的分配,當前做法如下,HMaster會在Zookeeper上創建一個splitlog節點(默認為/hbase/splitlog節點),將"哪個RegionServer處理哪個Region"的信息以列表的形式存放在該節點上,然后由各個RegionServer服務器自行到該節點上去領取任務並在任務執行成功或失敗后再更新該節點的信息以通知HMaster繼續后續步驟,Zookeeper起到了相互通知和信息持久化的角色。
Replication管理
Replication是實現HBase中主備集群間的實時同步的重要模塊,與傳統關系型數據庫不同的是,HBase的Replication是多對多的,且每個節點隨時都有可能掛掉,因此其會復雜得多。HBase也借助了Zookeeper來完成Replication功能,做法是在Zookeeper上記錄一個replication節點(默認是/hbase/replication節點),然后把不同的RegionServer服務器對應的HLog文件名稱記錄到相應的節點上,HMaster集群會將新增的數據推送給Slave集群,並同時將推送信息記錄到Zookeeper上(稱為斷點記錄),然后重復上述步驟,當服務器掛掉時,由於Zookeeper上已經保存了斷點信息,因此只要有HMaster能夠根據這些信息來協同用來推送HLog數據的主節點服務器就可以進行繼續復制操作。
四、Kafka
kafka是一個吞吐量極高的分布式消息系統,其整體設計是典型的發布與訂閱系統模式,在Kafka集群中,沒有中心主節點概念,所有服務器都是對等的,因此,可以在不做任何配置更改的情況下實現服務器的添加與刪除,同樣,消息的生產者和消費者也能夠隨意重啟和機器的上下線。
生產者(Producer):消息產生的源頭,負責生成消息並發送到Kafka服務器。
消費者(Consumer):消息的使用方,負責消費Kafka服務器上的消息。
主題(Topic):由用戶定義並配置在Kafka服務端,用於建立生產者和消費者之間的訂閱關系,生產者發送消息到指定Topic下,消費者從這個Topic中消費消息。
消息分區(Partition):一個Topic下會分為多個分區,如"kafka-test"這個Topic可以分為10個分區,分別由兩台服務器提供,那么通常可以配置讓每台服務器提供5個分區,假設服務器ID為0和1,那么分區為0-0、0-1、0-2、0-3、0-4和1-0、 1-1、1-2、1-3、1-4。消息分區機制和分區的數量與消費者的負載均衡機制有很大的關系。
服務器(Broker):用於存儲信息,在消息中間件中通常被稱為Broker。
消費者分組(Group):歸組同類消費者,多個消費者可以共同消費一個Topic下的消息,每個消費者消費其中的部分消息,這些消費者組成了消費者分組,擁有同一個分組名稱,通常也被稱為消費者集群。
偏移量(Offset):消息存儲在Kafka的Broker上,消費者拉取消息數據的過程中需要知道消息在文件中的偏移量。
Broker注冊
Broker是分布式部署並且相互之間相互獨立,但是需要有一個注冊系統能夠將整個集群中的Broker管理起來,此時就使用到了Zookeeper。在Zookeeper上會有一個專門用來進行Broker服務器列表記錄的節點/brokers/ids。每個Broker在啟動時,都會到Zookeeper上進行注冊,即到/brokers/ids下創建屬於自己的節點,如/brokers/ids/[0...N]。Kafka使用了全局唯一的數字來指代每個Broker服務器,不同的Broker必須使用不同的Broker ID進行注冊,創建完節點后,每個Broker就會將自己的IP地址和端口信息記錄到該節點中去。其中,Broker創建的節點類型是臨時節點,一旦Broker宕機,則對應的臨時節點也會被自動刪除。
Topic注冊
在Kafka中,同一個Topic的消息會被分成多個分區並將其分布在多個Broker上,這些分區信息及與Broker的對應關系也都是由Zookeeper在維護,由專門的節點來記錄,如/borkers/topics。Kafka中每個Topic都會以/brokers/topics/[topic]的形式被記錄,如/brokers/topics/login和/brokers/topics/search等。Broker服務器啟動后,會到對應Topic節點(/brokers/topics)上注冊自己的Broker ID並寫入針對該Topic的分區總數,如/brokers/topics/login/3->2,這個節點表示Broker ID為3的一個Broker服務器,對於"login"這個Topic的消息,提供了2個分區進行消息存儲,同樣,這個分區節點也是臨時節點。
生產者負載均衡
由於同一個Topic消息會被分區並將其分布在多個Broker上,因此,生產者需要將消息合理地發送到這些分布式的Broker上,那么如何實現生產者的負載均衡,Kafka支持傳統的四層負載均衡,也支持Zookeeper方式實現負載均衡。
① 四層負載均衡,根據生產者的IP地址和端口來為其確定一個相關聯的Broker。通常,一個生產者只會對應單個Broker,然后該生產者產生的消息都發往該Broker。這種方式邏輯簡單,每個生產者不需要同其他系統建立額外的TCP連接,只需要和Broker維護單個TCP連接即可。但是,其無法做到真正的負載均衡,因為實際系統中的每個生產者產生的消息量及每個Broker的消息存儲量都是不一樣的,如果有些生產者產生的消息遠多於其他生產者的話,那么會導致不同的Broker接收到的消息總數差異巨大,同時,生產者也無法實時感知到Broker的新增和刪除。
② 使用Zookeeper進行負載均衡,由於每個Broker啟動時,都會完成Broker注冊過程,生產者會通過該節點的變化來動態地感知到Broker服務器列表的變更,這樣就可以實現動態的負載均衡機制。
消費者負載均衡
與生產者類似,Kafka中的消費者同樣需要進行負載均衡來實現多個消費者合理地從對應的Broker服務器上接收消息,每個消費者分組包含若干消費者,每條消息都只會發送給分組中的一個消費者,不同的消費者分組消費自己特定的Topic下面的消息,互不干擾。
消費分區與消費者的關系
對於每個消費者分組,Kafka都會為其分配一個全局唯一的Group ID,同一個消費者分組內部的所有消費者共享該ID。同時,Kafka為每個消費者分配一個Consumer ID,通常采用"Hostname:UUID"形式表示。在Kafka中,規定了每個消息分區有且只能同時有一個消費者進行消費,因此,需要在Zookeeper上記錄消息分區與消費者之間的關系,每個消費者一旦確定了對一個消息分區的消費權力,需要將其Consumer ID 寫入到對應消息分區的臨時節點上,例如/consumers/[group_id]/owners/[topic]/[broker_id-partition_id],其中,[broker_id-partition_id]就是一個消息分區的標識,節點內容就是該消費分區上消息消費者的Consumer ID。
消息消費進度Offset記錄
在消費者對指定消息分區進行消息消費的過程中,需要定時地將分區消息的消費進度Offset記錄到Zookeeper上,以便在該消費者進行重啟或者其他消費者重新接管該消息分區的消息消費后,能夠從之前的進度開始繼續進行消息消費。Offset在Zookeeper中由一個專門節點進行記錄,其節點路徑為/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id],節點內容就是Offset的值。
消費者注冊
消費者服務器在初始化啟動時加入消費者分組的步驟如下
① 注冊到消費者分組。每個消費者服務器啟動時,都會到Zookeeper的指定節點下創建一個屬於自己的消費者節點,例如/consumers/[group_id]/ids/[consumer_id],完成節點創建后,消費者就會將自己訂閱的Topic信息寫入該臨時節點。
② 對消費者分組中的消費者的變化注冊監聽。每個消費者都需要關注所屬消費者分組中其他消費者服務器的變化情況,即對/consumers/[group_id]/ids節點注冊子節點變化的Watcher監聽,一旦發現消費者新增或減少,就觸發消費者的負載均衡。
③ 對Broker服務器變化注冊監聽。消費者需要對/broker/ids/[0-N]中的節點進行監聽,如果發現Broker服務器列表發生變化,那么就根據具體情況來決定是否需要進行消費者負載均衡。
④ 進行消費者負載均衡。為了讓同一個Topic下不同分區的消息盡量均衡地被多個消費者消費而進行消費者與消息分區分配的過程,通常,對於一個消費者分組,如果組內的消費者服務器發生變更或Broker服務器發生變更,會發出消費者負載均衡。
負載均衡
Kafka借助Zookeeper上記錄的Broker和消費者信息,采用消費者均衡算法進行負載均衡,其具體步驟如下。假設一個消息分組的每個消費者記為C1,C2,Ci,...,Cn。那么對於消費者Ci,其對應的消息分區分配策略如下:
1. 設置Pr為指定Topic所有的消息分區。
2. 設置Cg為統一消費者分組中的所有消費者。
3. 對Pr進行排序,使分布在同一個Broker服務器上的分區盡量靠在一起。
4. 對Cg進行排序。
5. 設置i為Ci在Cg中的位置索引,同時設置N = size (Pr) / size (Cg)。
6. 將編號為i * N ~ (i + 1) * N - 1的消息分區分配給Ci。
7. 重新更新Zookeeper上消息分區與消費者Ci的關系。
五、總結
本篇博客講解了Zookeeper在大型分布式系統中的應用,也體現了Zookeeper作為一款分布式協調器的優秀特點,至於Zookeeper在各個系統的詳細應用,讀者可以自行查閱資料,也謝謝各位園友的觀看~