Zookeeper的啟動流程
- zookeeper的主類是QuorumPeerMain,啟動時讀取zoo.cfg配置文件,如果沒有配置server列表,則單機模式啟動,否則按集群模式啟動,這里只分析集群模式
- 根據配置初始化quorumPeer對象,並啟動quorumPeer線程,這里主要做了幾件事情
- 讀取保存在磁盤上的數據,包括db的snapshot和txnlog,zookeeper的存儲結構另外專門講述
- 啟動cnxnFactory,這里主要是啟動一個server,用來接收來自client的請求,綁定在配置文件中的clientPort端口
- 在QuorumAddress上綁定一個server,用來和其他zookeeper server做交互
- 啟動leader選舉過程,因為server剛啟動時是存在LOOKING狀態,需要發一起一次選舉過程來獲取leader
- 啟動quorumPeer的主線程run,根據當前節點的狀態來啟動不同的流程
- 如果是Looking狀態,則調用FastLeaderElection::lookForLeader來發起選舉流程
- 如果是OBSERVING狀態,則開始Observer流程
- 如果是FOLLOWING狀態,則開始Follower流程
- 如果是LEADING狀態,則開始Leader流程
Zookeeper的選舉流程
- 發起選舉流程有兩種情況:
1)server剛啟動的時候,server的狀態初始化為LOOKING狀態
2)server發生異常,切換到LOOKING狀態
server有4中狀態
- LOOKING:初始狀態,表示在選舉leader
- FOLLOWING:跟隨leader的角色,參與投票
- LEADING:集群的leader
- OBSERVING:不參與投票,只是同步狀態
- 按剛啟動來講述選舉流程,QuorumPeer::start() -> QuorumPeer::startLeaderElection() -> QuorumPeer::createElectionAlgorithm
- 默認使用FastLeaderElection算法,初始化的流程如下:
- 初始化QuorumCnxManager,管理選舉中和其他server的交互,選舉時監聽在專門的electionAddr上
- QuorumCnxManager是實際發生網絡交互的地方,它的主要數據結構包括
- queueSendMap:sid -> buffer queue,為每個參與投票的server都保留一個隊列
- recvQueue:message queue,所有收到的消息都放到recvQueue
- listener:server主線程,收發消息時和上面兩個隊列交互
QuorumCnxManager可以保證每對peer之間只有一個鏈接,如果有server發起新的鏈接,則比較sid,sid大的保留鏈接,小的放棄鏈接
- 初始化FastLeaderElection,這是選舉邏輯所在的地方,它主要包括3個線程:
- Messenger::WorkerReceiver:從QuorumCnxManager::recvQueue中獲取網絡包,並將其發到FastLeaderElection::recvqueue中
- Messenger::WorkerSender:從FastLeaderElection::sendqueue中獲取網絡包,並將其放到QuorumCnxManager::queueSendMap中,並發送到網絡上
- lookForLeader:QuorumPeer主線程會調用lookForLeader函數,它從recvqueue中獲取別人發給server的選舉數據,並將發給其他server的投票放到sendqueue中
- FastLeaderElection::lookForLeader中實現了選舉算法,具體的流程如下:
- 首先更新選舉周期logicalclock,並把自己作為leader作為投票發給所有其他的server
- 然后進入本輪投票的循環
- 從recvqueue獲取一個網絡包,如果沒有收到包則檢查是否要重連和重發自己的投票
- 收到投票后判斷投票的狀態
- LOOKING:
- 如果對方投票的周期大於自己的周期,那就清空自己的已經收到的投票集合recvset,並將自己作為候選和對方投票的leader做比較,選出大的作為新的投票,然后再發送給所有人。
- LOOKING:
這里比較大小是通過比較(zxid,sid)這個二元組來的,zxid大的就大,否則sid大的就大
- 如果對方的投票周期小於自己,則忽略對方的投票
- 如果周期相等,則比較對方的投票和自己認為的候選,選出大的作為新的候選,然后再發送給所有人
- 然后判斷當前收到的投票是否可以得出誰是leader的結論,這里主要是通過判斷當前的候選leader在收到的投票中是否占了多數
- 如果候選leader在收到的投票中占了多數,則再等待finalizeWait時鍾,看是否有人修改leader的候選,如果修改了則把投票放到recvqueue中再從新循環
- OBSERVING:如果對方是一個觀察者,由於它沒有投票權,則無視它
- FOLLOWING或LEADING:
- 如果對方和自己再一個時鍾周期,說明對方已經完成選舉,如果對方說它是leader,那我們就把它作為leader,否則就要比較下對方選舉的leader在自己這里是否占有多數,並且選舉的leader確認了願意當leader,如果都通過了,就把這個候選作為自己的leader
- 如果對方和自己不再一個時鍾周期,說明自己掛掉后又恢復起來,這個時候把別人的投票收集到一個單獨的集合outofelection(從名字可以看出這個集合不是用在選舉判斷),如果對方的投票在outofelection中占有大多數,並且leader也確認了自己願意做leader,這個時候更新自己的選舉周期logicalclock,並修改自己的狀態為FOLLOWING或LEADING
QuromPeer線程
- Leader選舉完成之后,Peer確認了自己是Leader的身份,在QuromPeer的主線程中執行Leader的邏輯
- 創建Leader對象,並創建Server綁定在QuorumAddress上,用於和其他Follower之間相互通信
- 調用Leader::lead函數,執行Leader的真正的邏輯
- 調用ZooKeeperServer::loadData,從磁盤中恢復數據和session列表
- 啟用新的epoch,zookeeper中的zxid是64位,用於唯一標示一個操作,zxid的高32位是epoch,每次Leader切換加1,低32位為序列號,每次操作加1
- 啟動綁定在QuorumAddress上的Server,為每個Follower的連接建立一個LearnerHandler,用於和Follower做交互,這里的邏輯另外單獨論述
- 向所有的Follower發送一個NEWLEADER包,宣告自己額Leader身份,並在initLimit時間內等待大多數的Follower完成和Leader的同步,並發送ACK包,表示Follower已經和Leader完成同步並可以對外提供服務
- 這時Leader和Client之間的交互在cnxnFactory的Server中,Leader和Follower之間的交互在LearnerHandler所屬的線程中
- 然后調用Leader::lead函數的QuromPeer線程在每個tickTime中都會發送2個ping消息給其他的follower,follower在接收到ping消息后會回復一個ping消息,並附帶上follower的session tracker里的所有session信息,leader收到follower的ping消息后,根據傳回的session信息更新自己的session信息
LearnerHandler線程
- LearnerHandler主要是處理Leader和Follower之間的交互,和每個Follower連接會維持一個長連接,並有一個單獨的LearnerHandler線程和一個Follower進行交互
- 當Follower和Leader建立連接后,會先發一個FOLLOWERINFO包,包含了follower的server id和最近的一個zxid,即peerLastZxid
- 根據peerLastZxid來判斷如何與Follower進行同步
- 如果peerLastZxid大於leader的最新的zxid,則給follower發送trunc包,讓follower刪掉多出來的事務,一般來說這種情況比較少
- 如果peerLastZxid小於leader的最新的zxid,則給follower發送diff包,讓follower補齊和leader之間的差距
同步時發送包的順序如下:
- NEWLEADER(同步發送)
- DIFF(同步發送)
以下包的發送在一個線程中異步發送
- 循環發送寫入磁盤的txn和commit包
- 循環發送已經commit但還未寫入磁盤的toBeApplied數組的txn和commit包
- 循環發送已經提出proposal但還未commit的outstandingProposals數組中的txn,注意這里沒有發送commit包
- 為了和follower做快速的同步,leader會在內存中緩存一部分最近的事務,即minCommittedLog和maxCommittedLog之間的事務,如果peerLastZxid比minCommittedLog還小的話,leader就給follower發送一個snap包,把當前leader的鏡像發給follower
- 同步等待第一個回復的ACK包,然后計算同步超時tickTime*syncLimit,同步的后續的ACK包在下面的循環中處理
- 循環處理和follower之間交互的包
- ACK包:調用leader.processAck方法,processAck函數的執行邏輯如下:
- 如果Ack包的zxid小於Leader的lastCommitted,則忽略
- 根據ack包的zxid,在outstandingProposals中找出對應的proposal
- 將ack包對應的follower的sid加入proposal的ackset,如果ackset中超過大多數,則表示這個proposal可以commit
- 從outstandingProposals中刪除這個proposal,並把這個proposal加入到已經可以commit的toBeApplied數組中
- 向follower發送commit包,通知follower將proposal提交
- PING包:用於和follower同步session信息
- REQUEST包:follower轉發過來的修改狀態的請求,調用ZooKeeperServer::submitRequest方法,這個方法后面單獨論述
- ACK包:調用leader.processAck方法,processAck函數的執行邏輯如下:
NIOServerCnxn::Factory線程
- 該線程主要負責server和client的交互
- 該server是基於select的,當有客戶端連接server時,會調用doIO邏輯,這里會把socket上的數據讀取出來解析並處理(readPayload函數),並把需要寫出的outgoingBuffers寫入socket
- 如果是剛連接上,則調用readConnectRequest,這里會調用
submitRequest(cnxn,sessionId, OpCode.createSession, 0, to, null);
實際是發起一個創建session的請求
- 如果不是第一次連接,則調用readRequest函數,這里會從socket上讀出Request數據,然后調用submitRequest
我們可以看到來自client的請求和來自其他server的請求都會調用submitRequest函數,這個函數會調用server上的RequestProcessor鏈,server實現的是責任鏈模式,每個請求都會經過責任鏈里所有RequestProcessor的處理
對於Leader來說,LeaderZooKeeperServer::setupRequestProcessors設置了Leader用到的責任鏈,按從前到后的順序如下:
- PrepRequestProcessor:創建和修改狀態的Request關聯的header和txn
- ProposalRequestProcessor:將寫請求發送proposal到所有的follower
- SyncRequestProcessor:將發出去的proposal批量寫入磁盤
- AckRequestProcessor:當proposal真正寫入了磁盤后,向本機發送ack包
- CommitProcessor:匹配本地submitted的請求和收到的committed的請求
- ToBeAppliedRequestProcessor:把寫入到磁盤的proposal從toBeApplied中刪除
- finalProcessor:把commit的proposal寫入到本機的內存狀態中