Zookeeper之Zookeeper的Client的分析


1)幾個重要概念 

  • ZooKeeper:客戶端入口

  • Watcher:客戶端注冊的callback

  • ZooKeeper.SendThread: IO線程

  • ZooKeeper.EventThread: 事件處理線程,處理各類消息callback

  • ClientCnxnSocketNIO:繼承自ClientCnxnSocket,專門處理IO

 

2)zookeeper初始化

    • 應用提供watch實例

    • 實例化zookeeper

      • 實例化socket,默認使用ClientCnxnSocketNIO,可通過zookeeper.clientCnxnSocket配置定制

      • 實例化ClientCnxn

      • 實例化SendThread

      • 實例化EventThread

    • 啟動zookeeper

      • 啟動SendThread

        • 連接服務器(見SendThread.startConnect)

          • 產生真正的socket,見ClientCnxnSocketNIO.createSock

          • 向select注冊一個OP_CONNECT事件並連接服務器,由於是非阻塞連接,此時有可能並不會立即連上,如果連上就會調用SendThread.primeConnection初始化連接來注冊讀寫事件,否則會在接下來的輪詢select獲取連接事件中處理

          • 復位socket的incomingBuffer

 

          • 連接成功后會產生一個connect型的請求發給服務,用於獲取本次連接的sessionid

          • 進入循環等待來自應用的請求,如果沒有就根據時間來ping 服務器

 

    • 啟動EventThread

      • 開始進入無限循環,從隊列waitingEvents中獲取事件,如果沒有就阻塞等待

 

3)以一個請求為例以 zk.exists("/root", false)為例

  • 客戶端線程

    • 構造一個exists類型的請求,請求類型見ZooDefs.OpCode

    • 將請求構造成一個Packet,並將該packet放入outgoingQueue

      • 喚醒select

    • 阻塞等待結果

  • SendThread

    • 通過select 輪詢判斷是否有socket准備好,如果能讀就讀,能寫就寫

    • 此時socket准備好寫了 ,就從outgoingQueue獲取packet, 將packet發送到服務端 

    • 一旦發送了一個完整的packet,就將packet從outgoingQueue移除

    • 最后將packet加入到pendingQueue

    • 再次select輪詢看是否有響應數據,如果有首先都去4個字節的響應頭(包含響應的長度信息),然后在下一次遍歷中都去響應體

    • 都到響應將packet從pendingQueue移除

    • 如果該請求packet帶有一個callback,那么會將此packet放入waitingEvents隊列,讓EventThread去處理

    • 最后會調用p.notifyAll()解鎖,於是應用線程從阻塞中出來

  • 如果使用了帶callback 的exists,EventThread會干活

 

4)小結

4.1)

SendThread也並非完全對應與請求/響應模式,SendThread也會接受到節點變化的通知,此時客戶端變成了服務端

 

4.2)時間和超時的控制

ClientCnxnSocket作為ClientCnxnSocketNIO的父類,

有3個關鍵的時間字段

  • now :每次輪詢select之前更新,或者發生錯誤是在catch段中更新為當前時間

  • lastHeard:在讀取了響應,包括上面提到的connect型請求和常規命令型請求的響應以及完成網絡連接時更新為當前時間

  • lastSend:每次發送完ping 命令和請求以及完成網絡連接時更新為當前時間

有下面幾個超時設置

  • sessionTimeout:zookeeper初始化時設置的

  • readTimeout:sessionTimeout * 2 / 3

  • connectTimeout:sessionTimeout / hostProvider.size();  //hostProvider.size()為zookeeper服務器個數

  • getIdleRecv():now - lastHeard

  • getIdleSend():now - lastSend

  • SessionTimeout的計算

    • 如果沒有完成連接to=connectTimeout - getIdleRecv()

    • 如果完成連接to=readTimeout - getIdleRecv()

    • 如果to<=0  就會拋出SessionTimeoutException

4.3)什么時候ping

   計算timeToNextPing = readTimeout / 2-getIdleSend()

  如果timeToNextPing <= 0,發送ping請求(只是將ping請求放入outgoingQueue,並不發生IO)

   

4.4)select阻塞多久

如果上述的0<timeToNextPing<to,那么阻塞時長為timeToNextPing,否則為to

如果有寫請求,select會被喚醒

 

4.5)sendThread的工作原理

該線程作為zookeeper客戶端的核心部分專門負責IO處理 

  • 計算select timeout(上面提到的to)

  • 檢查空閑時間,有可能拋出SessionTimeoutException或者發送ping

  • 使用select輪詢,獲取網絡事件(連接、讀、寫)也就是這3類

    • 如果是連接,做連接處理

    • 如果讀,過程如下

      • 讀取消息頭,4個字節,頭包含了消息體的字節數

      • 讀 取消息體,分為兩個大類消息,連接型消息“connect”和非連接型消息“header”,前者上面提到過就是連接完成之后發的一種消息,用於確定 sessionid, 另外前者會調用sendThread.onConnected,后者會調用sendThread.readResponse

      • 非連接型消息有分為幾類

        • ping 消息

        • auth認證消息 

        • 訂閱的消息,即各種變化的通知,比如子節點變化、節點內容變化,由服務器推過來的消息 ,獲取到這類消息或通過eventThread.queueEvent將消息推入事件隊列

        • 客戶端命令的response,如果此消息帶有callback着通過eventThread.queuePacket推入事件隊列,否者喚醒阻塞的應用線程,注意到客戶端命令都會有阻塞版本和異步版本(帶callback)

    • 如果是寫,就從outgoingQueue獲取packet,寫入網絡

4.6)請求中的Watcher和StatCallback的差別

兩個都是callback,兩者都由EventThread,但后者控制調用線程是否會阻塞等待響應

 

4.7)IO模型

如圖

 

 

  • 沒有使用傳統連接池,會和zookeeper集群中的一台相連

  • 單IO線程(NIO)+事件線程,很標准的NIO模式


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM