Zookeeper源碼分析


Zookeeper的啟動流程

  1. zookeeper的主類是QuorumPeerMain,啟動時讀取zoo.cfg配置文件,如果沒有配置server列表,則單機模式啟動,否則按集群模式啟動,這里只分析集群模式
  1. 根據配置初始化quorumPeer對象,並啟動quorumPeer線程,這里主要做了幾件事情
    1. 讀取保存在磁盤上的數據,包括dbsnapshottxnlogzookeeper的存儲結構另外專門講述
    1. 啟動cnxnFactory,這里主要是啟動一個server,用來接收來自client的請求,綁定在配置文件中的clientPort端口
    1. QuorumAddress上綁定一個server,用來和其他zookeeper server做交互
    2. 啟動leader選舉過程,因為server剛啟動時是存在LOOKING狀態,需要發一起一次選舉過程來獲取leader
    1. 啟動quorumPeer的主線程run,根據當前節點的狀態來啟動不同的流程
      1. 如果是Looking狀態,則調用FastLeaderElection::lookForLeader來發起選舉流程
      1. 如果是OBSERVING狀態,則開始Observer流程
      1. 如果是FOLLOWING狀態,則開始Follower流程
      1. 如果是LEADING狀態,則開始Leader流程

Zookeeper的選舉流程 

  1. 發起選舉流程有兩種情況:

        1)server剛啟動的時候,server的狀態初始化為LOOKING狀態

        2)server發生異常,切換到LOOKING狀態

        server4中狀態

  1. LOOKING:初始狀態,表示在選舉leader
  2. FOLLOWING:跟隨leader的角色,參與投票
  3. LEADING:集群的leader
  4. OBSERVING:不參與投票,只是同步狀態
  1. 按剛啟動來講述選舉流程,QuorumPeer::start() -> QuorumPeer::startLeaderElection() -> QuorumPeer::createElectionAlgorithm
  1. 默認使用FastLeaderElection算法,初始化的流程如下:
    1. 初始化QuorumCnxManager,管理選舉中和其他server的交互,選舉時監聽在專門的electionAddr
    2. QuorumCnxManager是實際發生網絡交互的地方,它的主要數據結構包括
      1. queueSendMap:sid -> buffer queue,為每個參與投票的server都保留一個隊列
      2. recvQueue:message queue,所有收到的消息都放到recvQueue
      3. listenerserver主線程,收發消息時和上面兩個隊列交互

QuorumCnxManager可以保證每對peer之間只有一個鏈接,如果有server發起新的鏈接,則比較sidsid大的保留鏈接,小的放棄鏈接

  1. 初始化FastLeaderElection,這是選舉邏輯所在的地方,它主要包括3個線程:
    1. Messenger::WorkerReceiver:從QuorumCnxManager::recvQueue中獲取網絡包,並將其發到FastLeaderElection::recvqueue中
    2. Messenger::WorkerSender:從FastLeaderElection::sendqueue中獲取網絡包,並將其放到QuorumCnxManager::queueSendMap中,並發送到網絡上
    1. lookForLeader:QuorumPeer主線程會調用lookForLeader函數,它從recvqueue中獲取別人發給server的選舉數據,並將發給其他server的投票放到sendqueue中
  1. FastLeaderElection::lookForLeader中實現了選舉算法,具體的流程如下:
    1. 首先更新選舉周期logicalclock,並把自己作為leader作為投票發給所有其他的server
    1. 然后進入本輪投票的循環
      1. 從recvqueue獲取一個網絡包,如果沒有收到包則檢查是否要重連和重發自己的投票
      2. 收到投票后判斷投票的狀態
        1. LOOKING:
          1. 如果對方投票的周期大於自己的周期,那就清空自己的已經收到的投票集合recvset,並將自己作為候選和對方投票的leader做比較,選出大的作為新的投票,然后再發送給所有人。

這里比較大小是通過比較(zxidsid)這個二元組來的,zxid大的就大,否則sid大的就大

  1. 如果對方的投票周期小於自己,則忽略對方的投票
  1. 如果周期相等,則比較對方的投票和自己認為的候選,選出大的作為新的候選,然后再發送給所有人
  1. 然后判斷當前收到的投票是否可以得出誰是leader的結論,這里主要是通過判斷當前的候選leader在收到的投票中是否占了多數
  1. 如果候選leader在收到的投票中占了多數,則再等待finalizeWait時鍾,看是否有人修改leader的候選,如果修改了則把投票放到recvqueue中再從新循環
  1. OBSERVING:如果對方是一個觀察者,由於它沒有投票權,則無視它
  1. FOLLOWING或LEADING:
    1. 如果對方和自己再一個時鍾周期,說明對方已經完成選舉,如果對方說它是leader,那我們就把它作為leader,否則就要比較下對方選舉的leader在自己這里是否占有多數,並且選舉的leader確認了願意當leader,如果都通過了,就把這個候選作為自己的leader
    2. 如果對方和自己不再一個時鍾周期,說明自己掛掉后又恢復起來,這個時候把別人的投票收集到一個單獨的集合outofelection(從名字可以看出這個集合不是用在選舉判斷),如果對方的投票在outofelection中占有大多數,並且leader也確認了自己願意做leader,這個時候更新自己的選舉周期logicalclock,並修改自己的狀態為FOLLOWING或LEADING

 Leader執行流程

QuromPeer線程

  1. Leader選舉完成之后,Peer確認了自己是Leader的身份,在QuromPeer的主線程中執行Leader的邏輯
  1. 創建Leader對象,並創建Server綁定在QuorumAddress上,用於和其他Follower之間相互通信
  1. 調用Leader::lead函數,執行Leader的真正的邏輯
    1. 調用ZooKeeperServer::loadData,從磁盤中恢復數據和session列表
    2. 啟用新的epoch,zookeeper中的zxid64位,用於唯一標示一個操作,zxid的高32位是epoch,每次Leader切換加1,低32位為序列號,每次操作加1
    1. 啟動綁定在QuorumAddress上的Server,為每個Follower的連接建立一個LearnerHandler,用於和Follower做交互,這里的邏輯另外單獨論述
    1. 向所有的Follower發送一個NEWLEADER包,宣告自己額Leader身份,並在initLimit時間內等待大多數的Follower完成和Leader的同步,並發送ACK包,表示Follower已經和Leader完成同步並可以對外提供服務
    1. 這時LeaderClient之間的交互在cnxnFactoryServer中,LeaderFollower之間的交互在LearnerHandler所屬的線程中
    1. 然后調用Leader::lead函數的QuromPeer線程在每個tickTime中都會發送2ping消息給其他的followerfollower在接收到ping消息后會回復一個ping消息,並附帶上followersession tracker里的所有session信息,leader收到followerping消息后,根據傳回的session信息更新自己的session信息

LearnerHandler線程

  1. LearnerHandler主要是處理LeaderFollower之間的交互,和每個Follower連接會維持一個長連接,並有一個單獨的LearnerHandler線程和一個Follower進行交互
  2. FollowerLeader建立連接后,會先發一個FOLLOWERINFO包,包含了followerserver id和最近的一個zxid,即peerLastZxid
  1. 根據peerLastZxid來判斷如何與Follower進行同步
    1. 如果peerLastZxid大於leader的最新的zxid,則給follower發送trunc包,讓follower刪掉多出來的事務,一般來說這種情況比較少
    2. 如果peerLastZxid小於leader的最新的zxid,則給follower發送diff包,讓follower補齊和leader之間的差距

同步時發送包的順序如下:

  • NEWLEADER(同步發送)
  • DIFF(同步發送)

以下包的發送在一個線程中異步發送

  • 循環發送寫入磁盤的txncommit
  • 循環發送已經commit但還未寫入磁盤的toBeApplied數組的txncommit
  • 循環發送已經提出proposal但還未commit的outstandingProposals數組中的txn,注意這里沒有發送commit
  1. 為了和follower做快速的同步,leader會在內存中緩存一部分最近的事務,即minCommittedLog和maxCommittedLog之間的事務,如果peerLastZxid比minCommittedLog還小的話,leader就給follower發送一個snap包,把當前leader的鏡像發給follower
  1. 同步等待第一個回復的ACK包,然后計算同步超時tickTime*syncLimit,同步的后續的ACK包在下面的循環中處理
  2. 循環處理和follower之間交互的包
    1. ACK包:調用leader.processAck方法,processAck函數的執行邏輯如下:
      1. 如果Ack包的zxid小於Leader的lastCommitted,則忽略
      2. 根據ack包的zxid,在outstandingProposals中找出對應的proposal
      1. ack包對應的followersid加入proposalackset,如果ackset中超過大多數,則表示這個proposal可以commit
      1. 從outstandingProposals中刪除這個proposal,並把這個proposal加入到已經可以commit的toBeApplied數組中
      2. follower發送commit包,通知followerproposal提交
    1. PING包:用於和follower同步session信息
    2. REQUEST包:follower轉發過來的修改狀態的請求,調用ZooKeeperServer::submitRequest方法,這個方法后面單獨論述

 

NIOServerCnxn::Factory線程

  1. 該線程主要負責serverclient的交互
  1. server是基於select的,當有客戶端連接server時,會調用doIO邏輯,這里會把socket上的數據讀取出來解析並處理(readPayload函數),並把需要寫出的outgoingBuffers寫入socket
  1. 如果是剛連接上,則調用readConnectRequest,這里會調用

submitRequest(cnxn,sessionId, OpCode.createSession, 0, to, null);

實際是發起一個創建session的請求

  1. 如果不是第一次連接,則調用readRequest函數,這里會從socket上讀出Request數據,然后調用submitRequest

 

我們可以看到來自client的請求和來自其他server的請求都會調用submitRequest函數,這個函數會調用server上的RequestProcessor鏈,server實現的是責任鏈模式,每個請求都會經過責任鏈里所有RequestProcessor的處理

 

對於Leader來說,LeaderZooKeeperServer::setupRequestProcessors設置了Leader用到的責任鏈,按從前到后的順序如下:

  • PrepRequestProcessor:創建和修改狀態的Request關聯的headertxn
  • ProposalRequestProcessor:將寫請求發送proposal到所有的follower
  • SyncRequestProcessor:將發出去的proposal批量寫入磁盤
  • AckRequestProcessor:當proposal真正寫入了磁盤后,向本機發送ack
  • CommitProcessor:匹配本地submitted的請求和收到的committed的請求
  • ToBeAppliedRequestProcessor:把寫入到磁盤的proposal從toBeApplied中刪除
  • finalProcessor:把commitproposal寫入到本機的內存狀態中


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM